satellite/accounting/tally: fix looping over all buckets
We have a bug where if number of buckets in the system will be multiplication of batch size (2500) then loop that is going over all buckets can run indefinitely. Fixes https://github.com/storj/storj/issues/5374 Change-Id: Idd4d97c638db83e46528acb9abf223c98ad46223
This commit is contained in:
parent
a3ff3eb193
commit
8e030f4e30
@ -300,14 +300,8 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var lastBucketLocation metabase.BucketLocation
|
||||
var bucketLocationsSize int
|
||||
|
||||
for {
|
||||
err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
||||
if len(bucketLocations) < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
||||
tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{
|
||||
From: bucketLocations[0],
|
||||
To: bucketLocations[len(bucketLocations)-1],
|
||||
@ -330,16 +324,13 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta
|
||||
bucket.ObjectCount = tally.ObjectCount
|
||||
}
|
||||
|
||||
bucketLocationsSize = len(bucketLocations)
|
||||
|
||||
lastBucketLocation = bucketLocations[len(bucketLocations)-1]
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bucketLocationsSize < observer.config.ListLimit {
|
||||
if !more {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -5,12 +5,14 @@ package tally_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
@ -422,3 +424,50 @@ func TestTallyOnCopiedObject(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestTallyBatchSize(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.ProjectLimits.MaxBuckets = 100
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
planet.Satellites[0].Accounting.Tally.Loop.Pause()
|
||||
|
||||
projectID := planet.Uplinks[0].Projects[0].ID
|
||||
|
||||
numberOfBuckets := 13
|
||||
for i := 0; i < numberOfBuckets; i++ {
|
||||
data := testrand.Bytes(1*memory.KiB + memory.Size(i))
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket"+strconv.Itoa(i), "test", data)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, numberOfBuckets)
|
||||
|
||||
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} {
|
||||
collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{
|
||||
Interval: 1 * time.Hour,
|
||||
UseObjectsLoop: false,
|
||||
ListLimit: batchSize,
|
||||
AsOfSystemInterval: 1 * time.Microsecond,
|
||||
})
|
||||
err := collector.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, numberOfBuckets, len(collector.Bucket))
|
||||
for _, object := range objects {
|
||||
bucket := collector.Bucket[metabase.BucketLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: object.BucketName,
|
||||
}]
|
||||
require.Equal(t, object.TotalEncryptedSize, bucket.TotalBytes)
|
||||
require.EqualValues(t, 1, bucket.ObjectCount)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -44,5 +44,5 @@ type DB interface {
|
||||
// CountBuckets returns the number of buckets a project currently has
|
||||
CountBuckets(ctx context.Context, projectID uuid.UUID) (int, error)
|
||||
// IterateBucketLocations iterates through all buckets from some point with limit.
|
||||
IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error)
|
||||
IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (more bool, err error)
|
||||
}
|
||||
|
@ -269,10 +269,10 @@ func TestBatchBuckets(t *testing.T) {
|
||||
|
||||
sortBucketLocations(expectedBucketLocations)
|
||||
|
||||
testLimits := []int{1, 3, 30, 1000}
|
||||
testLimits := []int{1, 3, 30, 1000, len(expectedBucketLocations)}
|
||||
|
||||
for _, testLimit := range testLimits {
|
||||
err := db.Buckets().IterateBucketLocations(ctx, uuid.UUID{}, "", testLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
||||
more, err := db.Buckets().IterateBucketLocations(ctx, uuid.UUID{}, "", testLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
||||
if testLimit > len(expectedBucketLocations) {
|
||||
testLimit = len(expectedBucketLocations)
|
||||
}
|
||||
@ -282,7 +282,11 @@ func TestBatchBuckets(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
if testLimit < len(expectedBucketLocations) {
|
||||
require.True(t, more)
|
||||
} else {
|
||||
require.False(t, more)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -325,20 +325,21 @@ func convertDBXtoBucket(dbxBucket *dbx.BucketMetainfo) (bucket storj.Bucket, err
|
||||
}
|
||||
|
||||
// IterateBucketLocations iterates through all buckets from some point with limit.
|
||||
func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error) {
|
||||
func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var result []metabase.BucketLocation
|
||||
|
||||
moreLimit := limit + 1
|
||||
rows, err := db.db.QueryContext(ctx, `
|
||||
SELECT project_id, name
|
||||
FROM bucket_metainfos
|
||||
WHERE (project_id, name) > ($1, $2)
|
||||
GROUP BY (project_id, name)
|
||||
ORDER BY (project_id, name) ASC LIMIT $3
|
||||
`, projectID, bucketName, limit)
|
||||
`, projectID, bucketName, moreLimit)
|
||||
if err != nil {
|
||||
return storj.ErrBucket.New("BatchBuckets query error: %s", err)
|
||||
return false, storj.ErrBucket.New("BatchBuckets query error: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, Error.Wrap(rows.Close()))
|
||||
@ -348,15 +349,23 @@ func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.
|
||||
var bucketLocation metabase.BucketLocation
|
||||
|
||||
if err = rows.Scan(&bucketLocation.ProjectID, &bucketLocation.BucketName); err != nil {
|
||||
return storj.ErrBucket.New("bucket location scan error: %s", err)
|
||||
return false, storj.ErrBucket.New("bucket location scan error: %s", err)
|
||||
}
|
||||
|
||||
result = append(result, bucketLocation)
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return storj.ErrBucket.Wrap(err)
|
||||
return false, storj.ErrBucket.Wrap(err)
|
||||
}
|
||||
|
||||
return Error.Wrap(fn(result))
|
||||
if len(result) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if len(result) > limit {
|
||||
return true, Error.Wrap(fn(result[:len(result)-1]))
|
||||
}
|
||||
|
||||
return false, Error.Wrap(fn(result))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user