diff --git a/monkit.lock b/monkit.lock index 8a4cc8b8b..6b26183e6 100644 --- a/monkit.lock +++ b/monkit.lock @@ -6,6 +6,7 @@ storj.io/storj/satellite/accounting."bucket_segments" IntVal storj.io/storj/satellite/accounting."total_bytes" IntVal storj.io/storj/satellite/accounting."total_objects" IntVal storj.io/storj/satellite/accounting."total_segments" IntVal +storj.io/storj/satellite/accounting/tally."bucket_tally_error" Event storj.io/storj/satellite/accounting/tally."nodetallies.totalsum" IntVal storj.io/storj/satellite/audit."audit_contained_nodes" IntVal storj.io/storj/satellite/audit."audit_contained_nodes_global" Meter diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 07865119c..3cfcd0369 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -26,10 +26,11 @@ var ( // Config contains configurable values for the tally service. type Config struct { - Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"` - SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"` - ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"` - UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"` + Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"` + SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"` + ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"` + UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"` + SaveTalliesBatchSize int `help:"how large should be insert into tallies" default:"10000"` ListLimit int `help:"how many buckets to query in a batch" default:"2500"` AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` @@ -75,6 +76,8 @@ func (service *Service) Run(ctx context.Context) (err error) { err := service.Tally(ctx) if err != nil { service.log.Error("tally failed", zap.Error(err)) + + mon.Event("bucket_tally_error") //mon:locked } return nil }) @@ -198,45 +201,65 @@ func (service *Service) Tally(ctx context.Context) (err error) { if err != nil { return Error.Wrap(err) } - finishTime := service.nowFn() + + if len(collector.Bucket) == 0 { + return nil + } // save the new results - var errAtRest error - if len(collector.Bucket) > 0 { - // record bucket tallies to DB - err = service.projectAccountingDB.SaveTallies(ctx, finishTime, collector.Bucket) - if err != nil { - errAtRest = Error.New("ProjectAccounting.SaveTallies failed: %v", err) - } + var errAtRest errs.Group - updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket)) + // record bucket tallies to DB + // TODO we should be able replace map with just slice + intervalStart := service.nowFn() + buffer := map[metabase.BucketLocation]*accounting.BucketTally{} + for location, tally := range collector.Bucket { + buffer[location] = tally + + if len(buffer) >= service.config.SaveTalliesBatchSize { + // don't stop on error, we would like to store as much as possible + errAtRest.Add(service.flushTallies(ctx, intervalStart, buffer)) + + for key := range buffer { + delete(buffer, key) + } + } } - if len(collector.Bucket) > 0 { - var total accounting.BucketTally - // TODO for now we don't have access to inline/remote stats per bucket - // but that may change in the future. To get back those stats we would - // most probably need to add inline/remote information to object in - // metabase. We didn't decide yet if that is really needed right now. - for _, bucket := range collector.Bucket { - monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked - monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked - // monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked - // monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked + errAtRest.Add(service.flushTallies(ctx, intervalStart, buffer)) - monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked - // monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked - // monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked - total.Combine(bucket) - } - monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked - monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked - monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked - monAccounting.IntVal("total_pending_objects").Observe(total.PendingObjectCount) + updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket)) + + var total accounting.BucketTally + // TODO for now we don't have access to inline/remote stats per bucket + // but that may change in the future. To get back those stats we would + // most probably need to add inline/remote information to object in + // metabase. We didn't decide yet if that is really needed right now. + for _, bucket := range collector.Bucket { + monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked + monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked + // monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked + // monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked + + monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked + // monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked + // monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked + total.Combine(bucket) } + monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked + monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked + monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked + monAccounting.IntVal("total_pending_objects").Observe(total.PendingObjectCount) - // return errors if something went wrong. - return errAtRest + return errAtRest.Err() +} + +func (service *Service) flushTallies(ctx context.Context, intervalStart time.Time, tallies map[metabase.BucketLocation]*accounting.BucketTally) error { + err := service.projectAccountingDB.SaveTallies(ctx, intervalStart, tallies) + if err != nil { + return Error.New("ProjectAccounting.SaveTallies failed: %v", err) + } + return nil } // BucketTallyCollector collects and adds up tallies for buckets. diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index 4a2c3e456..5833898f4 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -346,7 +346,7 @@ func TestTallyOnCopiedObject(t *testing.T) { }, } - findTally := func(bucket string, tallies []accounting.BucketTally) accounting.BucketTally { + findTally := func(t *testing.T, bucket string, tallies []accounting.BucketTally) accounting.BucketTally { for _, v := range tallies { if v.BucketName == bucket { return v @@ -378,7 +378,7 @@ func TestTallyOnCopiedObject(t *testing.T) { tallies, err := planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx) require.NoError(t, err) - lastTally := findTally(tc.name, tallies) + lastTally := findTally(t, tc.name, tallies) require.Equal(t, tc.name, lastTally.BucketName) require.Equal(t, tc.expectedTallyAfterCopy.ObjectCount, lastTally.ObjectCount) require.Equal(t, tc.expectedTallyAfterCopy.TotalBytes, lastTally.TotalBytes) @@ -392,7 +392,7 @@ func TestTallyOnCopiedObject(t *testing.T) { tallies, err = planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx) require.NoError(t, err) - lastTally = findTally(tc.name, tallies) + lastTally = findTally(t, tc.name, tallies) require.Equal(t, tc.name, lastTally.BucketName) require.Equal(t, tc.expectedTallyAfterDelete.ObjectCount, lastTally.ObjectCount) require.Equal(t, tc.expectedTallyAfterDelete.TotalBytes, lastTally.TotalBytes) @@ -402,7 +402,7 @@ func TestTallyOnCopiedObject(t *testing.T) { }) } -func TestTallyBatchSize(t *testing.T) { +func TestBucketTallyCollectorListLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ @@ -454,3 +454,58 @@ func TestTallyBatchSize(t *testing.T) { } }) } + +func TestTallySaveTalliesBatchSize(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.ProjectLimits.MaxBuckets = 23 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + planet.Satellites[0].Accounting.Tally.Loop.Pause() + + projectID := planet.Uplinks[0].Projects[0].ID + + numberOfBuckets := 23 + expectedBucketLocations := []metabase.BucketLocation{} + for i := 0; i < numberOfBuckets; i++ { + data := testrand.Bytes(1*memory.KiB + memory.Size(i)) + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket"+strconv.Itoa(i), "test", data) + require.NoError(t, err) + + expectedBucketLocations = append(expectedBucketLocations, metabase.BucketLocation{ + ProjectID: projectID, + BucketName: "bucket" + strconv.Itoa(i), + }) + } + + satellite := planet.Satellites[0] + for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 29, planet.Satellites[0].Config.Tally.SaveTalliesBatchSize} { + config := satellite.Config.Tally + config.SaveTalliesBatchSize = batchSize + + tally := tally.New(zaptest.NewLogger(t), satellite.DB.StoragenodeAccounting(), satellite.DB.ProjectAccounting(), + satellite.LiveAccounting.Cache, satellite.Metabase.DB, satellite.DB.Buckets(), config) + + // collect and store tallies in DB + err := tally.Tally(ctx) + require.NoError(t, err) + + // verify we have in DB expected list of tallies + tallies, err := satellite.DB.ProjectAccounting().GetTallies(ctx) + require.NoError(t, err) + + _, err = satellite.DB.Testing().RawDB().ExecContext(ctx, "DELETE FROM bucket_storage_tallies") + require.NoError(t, err) + + bucketLocations := []metabase.BucketLocation{} + for _, tally := range tallies { + bucketLocations = append(bucketLocations, tally.BucketLocation) + } + + require.ElementsMatch(t, expectedBucketLocations, bucketLocations) + } + }) +} diff --git a/satellite/metabase/loop_test.go b/satellite/metabase/loop_test.go index ded012c4f..be75bf7e0 100644 --- a/satellite/metabase/loop_test.go +++ b/satellite/metabase/loop_test.go @@ -822,14 +822,17 @@ func TestCollectBucketTallies(t *testing.T) { t.Run("invalid bucket name", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectA := uuid.UUID{1} + projectB := uuid.UUID{2} + metabasetest.CollectBucketTallies{ Opts: metabase.CollectBucketTallies{ From: metabase.BucketLocation{ - ProjectID: testrand.UUID(), + ProjectID: projectA, BucketName: "a\\", }, To: metabase.BucketLocation{ - ProjectID: testrand.UUID(), + ProjectID: projectB, BucketName: "b\\", }, }, diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 5a04bf45a..e2ba9245b 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -1099,6 +1099,9 @@ server.private-address: 127.0.0.1:7778 # how large of batches SaveRollup should process at a time # tally.save-rollup-batch-size: 1000 +# how large should be insert into tallies +# tally.save-tallies-batch-size: 10000 + # whether to enable node tally with ranged loop # tally.use-ranged-loop: true