diff --git a/satellite/accounting/billing_test.go b/satellite/accounting/billing_test.go index d2f8d387f..5738443a4 100644 --- a/satellite/accounting/billing_test.go +++ b/satellite/accounting/billing_test.go @@ -399,12 +399,18 @@ func TestBilling_ExpiredFiles(t *testing.T) { require.NotZero(t, len(tallies), "There should be at least one tally") // set the tally service to be in the future for the next get tallies call. it should - // not add any tallies. - planet.Satellites[0].Accounting.Tally.SetNow(func() time.Time { + // add an empty tally because the object we uploaded should have expired. + satelliteSys.Accounting.Tally.SetNow(func() time.Time { return now.Add(2 * time.Hour) }) newTallies := getTallies(ctx, t, planet, 0) - require.Equal(t, tallies, newTallies) + tallies = append(tallies, accounting.BucketTally{ + BucketLocation: metabase.BucketLocation{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + BucketName: bucketName, + }, + }) + require.ElementsMatch(t, tallies, newTallies) }) } diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index 3e457fd8b..290626dc6 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -212,6 +212,9 @@ type ProjectAccounting interface { GetTallies(ctx context.Context) ([]BucketTally, error) // CreateStorageTally creates a record for BucketStorageTally in the accounting DB table CreateStorageTally(ctx context.Context, tally BucketStorageTally) error + // GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range + // whose most recent tally does not represent empty usage. + GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) ([]metabase.BucketLocation, error) // GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame. GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) // GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day. diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index a4f504bef..99d3c581d 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -193,7 +193,7 @@ func (service *Service) Tally(ctx context.Context) (err error) { } // add up all buckets - collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.config) + collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.projectAccountingDB, service.config) err = collector.Run(ctx) if err != nil { return Error.Wrap(err) @@ -244,22 +244,24 @@ type BucketTallyCollector struct { Log *zap.Logger Bucket map[metabase.BucketLocation]*accounting.BucketTally - metabase *metabase.DB - bucketsDB buckets.DB - config Config + metabase *metabase.DB + bucketsDB buckets.DB + projectAccountingDB accounting.ProjectAccounting + config Config } // NewBucketTallyCollector returns a collector that adds up totals for buckets. // The now argument controls when the collector considers objects to be expired. -func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, config Config) *BucketTallyCollector { +func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, projectAccountingDB accounting.ProjectAccounting, config Config) *BucketTallyCollector { return &BucketTallyCollector{ Now: now, Log: log, Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally), - metabase: db, - bucketsDB: bucketsDB, - config: config, + metabase: db, + bucketsDB: bucketsDB, + projectAccountingDB: projectAccountingDB, + config: config, } } @@ -282,9 +284,24 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta var lastBucketLocation metabase.BucketLocation for { more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) { + fromBucket := bucketLocations[0] + toBucket := bucketLocations[len(bucketLocations)-1] + + // Prepopulate the results with empty tallies. Otherwise, empty buckets will be unaccounted for + // since they're not reached when iterating over objects in the metainfo DB. + // We only do this for buckets whose last tally is non-empty because only one empty tally is + // required for us to know that a bucket was empty the last time we checked. + locs, err := observer.projectAccountingDB.GetNonEmptyTallyBucketsInRange(ctx, fromBucket, toBucket) + if err != nil { + return err + } + for _, loc := range locs { + observer.Bucket[loc] = &accounting.BucketTally{BucketLocation: loc} + } + tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{ - From: bucketLocations[0], - To: bucketLocations[len(bucketLocations)-1], + From: fromBucket, + To: toBucket, AsOfSystemTime: startingTime, AsOfSystemInterval: observer.config.AsOfSystemInterval, Now: observer.Now, diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index 81c233b29..4a2c3e456 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -98,7 +98,14 @@ func TestOnlyInline(t *testing.T) { // run multiple times to ensure we add tallies for i := 0; i < 2; i++ { - collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + planet.Satellites[0].Log.Named("bucket tally"), + time.Now(), + planet.Satellites[0].Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err := collector.Run(ctx) require.NoError(t, err) @@ -173,7 +180,14 @@ func TestCalculateBucketAtRestData(t *testing.T) { } require.Len(t, expectedTotal, 3) - collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + satellite.Log.Named("bucket tally"), + time.Now(), + satellite.Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err = collector.Run(ctx) require.NoError(t, err) require.Equal(t, expectedTotal, collector.Bucket) @@ -186,16 +200,31 @@ func TestIgnoresExpiredPointers(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] + const bucketName = "bucket" + now := time.Now() err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour)) require.NoError(t, err) - collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally) + collector := tally.NewBucketTallyCollector( + satellite.Log.Named("bucket tally"), + now.Add(24*time.Hour), + satellite.Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + planet.Satellites[0].Config.Tally, + ) err = collector.Run(ctx) require.NoError(t, err) - // there should be no observed buckets because all of the objects are expired - require.Equal(t, collector.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{}) + // there should be a single empty tally because all of the objects are expired + loc := metabase.BucketLocation{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + BucketName: bucketName, + } + require.Equal(t, map[metabase.BucketLocation]*accounting.BucketTally{ + loc: {BucketLocation: loc}, + }, collector.Bucket) }) } @@ -398,11 +427,18 @@ func TestTallyBatchSize(t *testing.T) { require.Len(t, objects, numberOfBuckets) for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} { - collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{ - Interval: 1 * time.Hour, - ListLimit: batchSize, - AsOfSystemInterval: 1 * time.Microsecond, - }) + collector := tally.NewBucketTallyCollector( + zaptest.NewLogger(t), + time.Now(), + planet.Satellites[0].Metabase.DB, + planet.Satellites[0].DB.Buckets(), + planet.Satellites[0].DB.ProjectAccounting(), + tally.Config{ + Interval: 1 * time.Hour, + ListLimit: batchSize, + AsOfSystemInterval: 1 * time.Microsecond, + }, + ) err := collector.Run(ctx) require.NoError(t, err) diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index ebca36656..dcf7a3b28 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -22,6 +22,7 @@ import ( "storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil/pgerrcode" "storj.io/private/dbutil/pgxutil" + "storj.io/private/tagsql" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/orders" @@ -144,6 +145,39 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou return Error.Wrap(err) } +// GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range +// whose most recent tally does not represent empty usage. +func (db *ProjectAccounting) GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) (result []metabase.BucketLocation, err error) { + defer mon.Task()(&ctx)(&err) + + err = withRows(db.db.QueryContext(ctx, ` + SELECT project_id, name + FROM bucket_metainfos bm + WHERE (project_id, name) BETWEEN ($1, $2) AND ($3, $4) + AND NOT 0 IN ( + SELECT object_count FROM bucket_storage_tallies + WHERE (project_id, bucket_name) = (bm.project_id, bm.name) + ORDER BY interval_start DESC + LIMIT 1 + ) + `, from.ProjectID, from.BucketName, to.ProjectID, to.BucketName), + )(func(r tagsql.Rows) error { + for r.Next() { + loc := metabase.BucketLocation{} + if err := r.Scan(&loc.ProjectID, &loc.BucketName); err != nil { + return err + } + result = append(result, loc) + } + return nil + }) + if err != nil { + return nil, Error.Wrap(err) + } + + return result, nil +} + // GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame. func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/satellitedb/projectaccounting_test.go b/satellite/satellitedb/projectaccounting_test.go index a6f483ffa..17b9b41b8 100644 --- a/satellite/satellitedb/projectaccounting_test.go +++ b/satellite/satellitedb/projectaccounting_test.go @@ -452,3 +452,49 @@ func Test_GetProjectObjectsSegments(t *testing.T) { require.Zero(t, projectStats.SegmentCount) }) } + +func TestProjectUsageGap(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + uplink := planet.Uplinks[0] + tally := sat.Accounting.Tally + + tally.Loop.Pause() + + now := time.Time{} + tally.SetNow(func() time.Time { + return now + }) + + const ( + bucketName = "testbucket" + objectPath = "test/path" + ) + + data := testrand.Bytes(10) + require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data)) + tally.Loop.TriggerWait() + + objs, err := sat.Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objs, 1) + expectedStorage := objs[0].TotalEncryptedSize + + now = now.Add(time.Hour) + require.NoError(t, uplink.DeleteObject(ctx, sat, bucketName, objectPath)) + tally.Loop.TriggerWait() + + // This object is only uploaded and tallied so that the usage calculator knows + // how long it's been since the previous tally. + now = now.Add(time.Hour) + require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data)) + tally.Loop.TriggerWait() + + // The bucket was full for only 1 hour, so expect `expectedStorage` byte-hours of storage usage. + usage, err := sat.DB.ProjectAccounting().GetProjectTotal(ctx, uplink.Projects[0].ID, time.Time{}, now.Add(time.Second)) + require.NoError(t, err) + require.EqualValues(t, expectedStorage, usage.Storage) + }) +}