From 8e030f4e307fe39d721c67b6ca6ba3b504511974 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 6 Dec 2022 12:16:55 +0100 Subject: [PATCH] satellite/accounting/tally: fix looping over all buckets We have a bug where if number of buckets in the system will be multiplication of batch size (2500) then loop that is going over all buckets can run indefinitely. Fixes https://github.com/storj/storj/issues/5374 Change-Id: Idd4d97c638db83e46528acb9abf223c98ad46223 --- satellite/accounting/tally/tally.go | 13 +------ satellite/accounting/tally/tally_test.go | 49 ++++++++++++++++++++++++ satellite/buckets/db.go | 2 +- satellite/buckets/db_test.go | 10 +++-- satellite/satellitedb/bucketsdb.go | 21 +++++++--- 5 files changed, 74 insertions(+), 21 deletions(-) diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 4a77dc914..f7c2565e7 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -300,14 +300,8 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta defer mon.Task()(&ctx)(&err) var lastBucketLocation metabase.BucketLocation - var bucketLocationsSize int - for { - err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) { - if len(bucketLocations) < 1 { - return nil - } - + more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) { tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{ From: bucketLocations[0], To: bucketLocations[len(bucketLocations)-1], @@ -330,16 +324,13 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta bucket.ObjectCount = tally.ObjectCount } - bucketLocationsSize = len(bucketLocations) - lastBucketLocation = bucketLocations[len(bucketLocations)-1] return nil }) if err != nil { return err } - - if bucketLocationsSize < observer.config.ListLimit { + if !more { break } } diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index 2d6a702d5..cdaba0c17 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -5,12 +5,14 @@ package tally_test import ( "fmt" + "strconv" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" "storj.io/common/memory" "storj.io/common/storj" @@ -422,3 +424,50 @@ func TestTallyOnCopiedObject(t *testing.T) { } }) } + +func TestTallyBatchSize(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.ProjectLimits.MaxBuckets = 100 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + planet.Satellites[0].Accounting.Tally.Loop.Pause() + + projectID := planet.Uplinks[0].Projects[0].ID + + numberOfBuckets := 13 + for i := 0; i < numberOfBuckets; i++ { + data := testrand.Bytes(1*memory.KiB + memory.Size(i)) + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket"+strconv.Itoa(i), "test", data) + require.NoError(t, err) + } + + objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, numberOfBuckets) + + for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} { + collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{ + Interval: 1 * time.Hour, + UseObjectsLoop: false, + ListLimit: batchSize, + AsOfSystemInterval: 1 * time.Microsecond, + }) + err := collector.Run(ctx) + require.NoError(t, err) + + require.Equal(t, numberOfBuckets, len(collector.Bucket)) + for _, object := range objects { + bucket := collector.Bucket[metabase.BucketLocation{ + ProjectID: projectID, + BucketName: object.BucketName, + }] + require.Equal(t, object.TotalEncryptedSize, bucket.TotalBytes) + require.EqualValues(t, 1, bucket.ObjectCount) + } + } + }) +} diff --git a/satellite/buckets/db.go b/satellite/buckets/db.go index 72c77bcba..42665d922 100644 --- a/satellite/buckets/db.go +++ b/satellite/buckets/db.go @@ -44,5 +44,5 @@ type DB interface { // CountBuckets returns the number of buckets a project currently has CountBuckets(ctx context.Context, projectID uuid.UUID) (int, error) // IterateBucketLocations iterates through all buckets from some point with limit. - IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error) + IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (more bool, err error) } diff --git a/satellite/buckets/db_test.go b/satellite/buckets/db_test.go index aea49fc74..6866257b1 100644 --- a/satellite/buckets/db_test.go +++ b/satellite/buckets/db_test.go @@ -269,10 +269,10 @@ func TestBatchBuckets(t *testing.T) { sortBucketLocations(expectedBucketLocations) - testLimits := []int{1, 3, 30, 1000} + testLimits := []int{1, 3, 30, 1000, len(expectedBucketLocations)} for _, testLimit := range testLimits { - err := db.Buckets().IterateBucketLocations(ctx, uuid.UUID{}, "", testLimit, func(bucketLocations []metabase.BucketLocation) (err error) { + more, err := db.Buckets().IterateBucketLocations(ctx, uuid.UUID{}, "", testLimit, func(bucketLocations []metabase.BucketLocation) (err error) { if testLimit > len(expectedBucketLocations) { testLimit = len(expectedBucketLocations) } @@ -282,7 +282,11 @@ func TestBatchBuckets(t *testing.T) { return nil }) require.NoError(t, err) - + if testLimit < len(expectedBucketLocations) { + require.True(t, more) + } else { + require.False(t, more) + } } }) } diff --git a/satellite/satellitedb/bucketsdb.go b/satellite/satellitedb/bucketsdb.go index 17c7248f8..685ec634e 100644 --- a/satellite/satellitedb/bucketsdb.go +++ b/satellite/satellitedb/bucketsdb.go @@ -325,20 +325,21 @@ func convertDBXtoBucket(dbxBucket *dbx.BucketMetainfo) (bucket storj.Bucket, err } // IterateBucketLocations iterates through all buckets from some point with limit. -func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error) { +func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (more bool, err error) { defer mon.Task()(&ctx)(&err) var result []metabase.BucketLocation + moreLimit := limit + 1 rows, err := db.db.QueryContext(ctx, ` SELECT project_id, name FROM bucket_metainfos WHERE (project_id, name) > ($1, $2) GROUP BY (project_id, name) ORDER BY (project_id, name) ASC LIMIT $3 - `, projectID, bucketName, limit) + `, projectID, bucketName, moreLimit) if err != nil { - return storj.ErrBucket.New("BatchBuckets query error: %s", err) + return false, storj.ErrBucket.New("BatchBuckets query error: %s", err) } defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) @@ -348,15 +349,23 @@ func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid. var bucketLocation metabase.BucketLocation if err = rows.Scan(&bucketLocation.ProjectID, &bucketLocation.BucketName); err != nil { - return storj.ErrBucket.New("bucket location scan error: %s", err) + return false, storj.ErrBucket.New("bucket location scan error: %s", err) } result = append(result, bucketLocation) } if err = rows.Err(); err != nil { - return storj.ErrBucket.Wrap(err) + return false, storj.ErrBucket.Wrap(err) } - return Error.Wrap(fn(result)) + if len(result) == 0 { + return false, nil + } + + if len(result) > limit { + return true, Error.Wrap(fn(result[:len(result)-1])) + } + + return false, Error.Wrap(fn(result)) }