From c3d72a269e7f7bb8367ce9116a70cc8234dfff57 Mon Sep 17 00:00:00 2001 From: Jeremy Wharton Date: Fri, 19 May 2023 17:51:54 -0500 Subject: [PATCH] satellite/{accounting,satellitedb}: create tallies for empty buckets Tallies are now created for buckets with no objects. Previously, the bucket tally collector skipped empty buckets since it created tallies only using information from the objects table. Methods that used bucket tallies when calculating usage costs would return incorrect results because of this. Change-Id: I0b37fe7159a11cc02a51562000dad9258555d9f9 --- satellite/accounting/billing_test.go | 12 +++- satellite/accounting/db.go | 3 + satellite/accounting/tally/tally.go | 37 ++++++++---- satellite/accounting/tally/tally_test.go | 56 +++++++++++++++---- satellite/satellitedb/projectaccounting.go | 34 +++++++++++ .../satellitedb/projectaccounting_test.go | 46 +++++++++++++++ 6 files changed, 165 insertions(+), 23 deletions(-) diff --git a/satellite/accounting/billing_test.go b/satellite/accounting/billing_test.go index d2f8d387f..5738443a4 100644 --- a/satellite/accounting/billing_test.go +++ b/satellite/accounting/billing_test.go @@ -399,12 +399,18 @@ func TestBilling_ExpiredFiles(t *testing.T) { require.NotZero(t, len(tallies), "There should be at least one tally") // set the tally service to be in the future for the next get tallies call. it should - // not add any tallies. - planet.Satellites[0].Accounting.Tally.SetNow(func() time.Time { + // add an empty tally because the object we uploaded should have expired. + satelliteSys.Accounting.Tally.SetNow(func() time.Time { return now.Add(2 * time.Hour) }) newTallies := getTallies(ctx, t, planet, 0) - require.Equal(t, tallies, newTallies) + tallies = append(tallies, accounting.BucketTally{ + BucketLocation: metabase.BucketLocation{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + BucketName: bucketName, + }, + }) + require.ElementsMatch(t, tallies, newTallies) }) } diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index 3e457fd8b..290626dc6 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -212,6 +212,9 @@ type ProjectAccounting interface { GetTallies(ctx context.Context) ([]BucketTally, error) // CreateStorageTally creates a record for BucketStorageTally in the accounting DB table CreateStorageTally(ctx context.Context, tally BucketStorageTally) error + // GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range + // whose most recent tally does not represent empty usage. + GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) ([]metabase.BucketLocation, error) // GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame. GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) // GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day. diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index a4f504bef..99d3c581d 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -193,7 +193,7 @@ func (service *Service) Tally(ctx context.Context) (err error) { } // add up all buckets - collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.config) + collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.projectAccountingDB, service.config) err = collector.Run(ctx) if err != nil { return Error.Wrap(err) @@ -244,22 +244,24 @@ type BucketTallyCollector struct { Log *zap.Logger Bucket map[metabase.BucketLocation]*accounting.BucketTally - metabase *metabase.DB - bucketsDB buckets.DB - config Config + metabase *metabase.DB + bucketsDB buckets.DB + projectAccountingDB accounting.ProjectAccounting + config Config } // NewBucketTallyCollector returns a collector that adds up totals for buckets. // The now argument controls when the collector considers objects to be expired. -func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, config Config) *BucketTallyCollector { +func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, projectAccountingDB accounting.ProjectAccounting, config Config) *BucketTallyCollector { return &BucketTallyCollector{ Now: now, Log: log, Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally), - metabase: db, - bucketsDB: bucketsDB, - config: config, + metabase: db, + bucketsDB: bucketsDB, + projectAccountingDB: projectAccountingDB, + config: config, } } @@ -282,9 +284,24 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta var lastBucketLocation metabase.BucketLocation for { more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) { + fromBucket := bucketLocations[0] + toBucket := bucketLocations[len(bucketLocations)-1] + + // Prepopulate the results with empty tallies. Otherwise, empty buckets will be unaccounted for + // since they're not reached when iterating over objects in the metainfo DB. + // We only do this for buckets whose last tally is non-empty because only one empty tally is + // required for us to know that a bucket was empty the last time we checked. + locs, err := observer.projectAccountingDB.GetNonEmptyTallyBucketsInRange(ctx, fromBucket, toBucket) + if err != nil { + return err + } + for _, loc := range locs { + observer.Bucket[loc] = &accounting.BucketTally{BucketLocation: loc} + } + tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{ - From: bucketLocations[0], - To: bucketLocations[len(bucketLocations)-1], + From: fromBucket, + To: toBucket, AsOfSystemTime: startingTime, AsOfSystemInterval: observer.config.AsOfSystemInterval, Now: observer.Now, diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index 81c233b29..4a2c3e456 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -98,7 +98,14 @@ func TestOnlyInline(t *testing.T) { // run multiple times to ensure we add tallies for i := 0; i < 2; i++ { - collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + planet.Satellites[0].Log.Named("bucket tally"), + time.Now(), + planet.Satellites[0].Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err := collector.Run(ctx) require.NoError(t, err) @@ -173,7 +180,14 @@ func TestCalculateBucketAtRestData(t *testing.T) { } require.Len(t, expectedTotal, 3) - collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + satellite.Log.Named("bucket tally"), + time.Now(), + satellite.Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err = collector.Run(ctx) require.NoError(t, err) require.Equal(t, expectedTotal, collector.Bucket) @@ -186,16 +200,31 @@ func TestIgnoresExpiredPointers(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] + const bucketName = "bucket" + now := time.Now() err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour)) require.NoError(t, err) - collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + satellite.Log.Named("bucket tally"), + now.Add(24*time.Hour), + satellite.Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err = collector.Run(ctx) require.NoError(t, err) - // there should be no observed buckets because all of the objects are expired - require.Equal(t, collector.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{}) + // there should be a single empty tally because all of the objects are expired + loc := metabase.BucketLocation{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + BucketName: bucketName, + } + require.Equal(t, map[metabase.BucketLocation]*accounting.BucketTally{ + loc: {BucketLocation: loc}, + }, collector.Bucket) }) } @@ -398,11 +427,18 @@ func TestTallyBatchSize(t *testing.T) { require.Len(t, objects, numberOfBuckets) for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} { - collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{ - Interval: 1 * time.Hour, - ListLimit: batchSize, - AsOfSystemInterval: 1 * time.Microsecond, - }) + collector := tally.NewBucketTallyCollector( + zaptest.NewLogger(t), + time.Now(), + planet.Satellites[0].Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + tally.Config{ + Interval: 1 * time.Hour, + ListLimit: batchSize, + AsOfSystemInterval: 1 * time.Microsecond, + }, + ) err := collector.Run(ctx) require.NoError(t, err) diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index ebca36656..dcf7a3b28 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -22,6 +22,7 @@ import ( "storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil/pgerrcode" "storj.io/private/dbutil/pgxutil" + "storj.io/private/tagsql" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/orders" @@ -144,6 +145,39 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou return Error.Wrap(err) } +// GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range +// whose most recent tally does not represent empty usage. +func (db *ProjectAccounting) GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) (result []metabase.BucketLocation, err error) { + defer mon.Task()(&ctx)(&err) + + err = withRows(db.db.QueryContext(ctx, ` + SELECT project_id, name + FROM bucket_metainfos bm + WHERE (project_id, name) BETWEEN ($1, $2) AND ($3, $4) + AND NOT 0 IN ( + SELECT object_count FROM bucket_storage_tallies + WHERE (project_id, bucket_name) = (bm.project_id, bm.name) + ORDER BY interval_start DESC + LIMIT 1 + ) + `, from.ProjectID, from.BucketName, to.ProjectID, to.BucketName), + )(func(r tagsql.Rows) error { + for r.Next() { + loc := metabase.BucketLocation{} + if err := r.Scan(&loc.ProjectID, &loc.BucketName); err != nil { + return err + } + result = append(result, loc) + } + return nil + }) + if err != nil { + return nil, Error.Wrap(err) + } + + return result, nil +} + // GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame. func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/satellitedb/projectaccounting_test.go b/satellite/satellitedb/projectaccounting_test.go index a6f483ffa..17b9b41b8 100644 --- a/satellite/satellitedb/projectaccounting_test.go +++ b/satellite/satellitedb/projectaccounting_test.go @@ -452,3 +452,49 @@ func Test_GetProjectObjectsSegments(t *testing.T) { require.Zero(t, projectStats.SegmentCount) }) } + +func TestProjectUsageGap(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + uplink := planet.Uplinks[0] + tally := sat.Accounting.Tally + + tally.Loop.Pause() + + now := time.Time{} + tally.SetNow(func() time.Time { + return now + }) + + const ( + bucketName = "testbucket" + objectPath = "test/path" + ) + + data := testrand.Bytes(10) + require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data)) + tally.Loop.TriggerWait() + + objs, err := sat.Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objs, 1) + expectedStorage := objs[0].TotalEncryptedSize + + now = now.Add(time.Hour) + require.NoError(t, uplink.DeleteObject(ctx, sat, bucketName, objectPath)) + tally.Loop.TriggerWait() + + // This object is only uploaded and tallied so that the usage calculator knows + // how long it's been since the previous tally. + now = now.Add(time.Hour) + require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data)) + tally.Loop.TriggerWait() + + // The bucket was full for only 1 hour, so expect `expectedStorage` byte-hours of storage usage. + usage, err := sat.DB.ProjectAccounting().GetProjectTotal(ctx, uplink.Projects[0].ID, time.Time{}, now.Add(time.Second)) + require.NoError(t, err) + require.EqualValues(t, expectedStorage, usage.Storage) + }) +}