satellite/{accounting,satellitedb}: create tallies for empty buckets
Tallies are now created for buckets with no objects. Previously, the bucket tally collector skipped empty buckets since it created tallies only using information from the objects table. Methods that used bucket tallies when calculating usage costs would return incorrect results because of this. Change-Id: I0b37fe7159a11cc02a51562000dad9258555d9f9
This commit is contained in:
parent
ccfe5cae49
commit
c3d72a269e
@ -399,12 +399,18 @@ func TestBilling_ExpiredFiles(t *testing.T) {
|
|||||||
require.NotZero(t, len(tallies), "There should be at least one tally")
|
require.NotZero(t, len(tallies), "There should be at least one tally")
|
||||||
|
|
||||||
// set the tally service to be in the future for the next get tallies call. it should
|
// set the tally service to be in the future for the next get tallies call. it should
|
||||||
// not add any tallies.
|
// add an empty tally because the object we uploaded should have expired.
|
||||||
planet.Satellites[0].Accounting.Tally.SetNow(func() time.Time {
|
satelliteSys.Accounting.Tally.SetNow(func() time.Time {
|
||||||
return now.Add(2 * time.Hour)
|
return now.Add(2 * time.Hour)
|
||||||
})
|
})
|
||||||
newTallies := getTallies(ctx, t, planet, 0)
|
newTallies := getTallies(ctx, t, planet, 0)
|
||||||
require.Equal(t, tallies, newTallies)
|
tallies = append(tallies, accounting.BucketTally{
|
||||||
|
BucketLocation: metabase.BucketLocation{
|
||||||
|
ProjectID: planet.Uplinks[0].Projects[0].ID,
|
||||||
|
BucketName: bucketName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.ElementsMatch(t, tallies, newTallies)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,6 +212,9 @@ type ProjectAccounting interface {
|
|||||||
GetTallies(ctx context.Context) ([]BucketTally, error)
|
GetTallies(ctx context.Context) ([]BucketTally, error)
|
||||||
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
|
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
|
||||||
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
|
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
|
||||||
|
// GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range
|
||||||
|
// whose most recent tally does not represent empty usage.
|
||||||
|
GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) ([]metabase.BucketLocation, error)
|
||||||
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
||||||
GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error)
|
GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error)
|
||||||
// GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
|
// GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
|
||||||
|
@ -193,7 +193,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// add up all buckets
|
// add up all buckets
|
||||||
collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.config)
|
collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.projectAccountingDB, service.config)
|
||||||
err = collector.Run(ctx)
|
err = collector.Run(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
@ -244,22 +244,24 @@ type BucketTallyCollector struct {
|
|||||||
Log *zap.Logger
|
Log *zap.Logger
|
||||||
Bucket map[metabase.BucketLocation]*accounting.BucketTally
|
Bucket map[metabase.BucketLocation]*accounting.BucketTally
|
||||||
|
|
||||||
metabase *metabase.DB
|
metabase *metabase.DB
|
||||||
bucketsDB buckets.DB
|
bucketsDB buckets.DB
|
||||||
config Config
|
projectAccountingDB accounting.ProjectAccounting
|
||||||
|
config Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBucketTallyCollector returns a collector that adds up totals for buckets.
|
// NewBucketTallyCollector returns a collector that adds up totals for buckets.
|
||||||
// The now argument controls when the collector considers objects to be expired.
|
// The now argument controls when the collector considers objects to be expired.
|
||||||
func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, config Config) *BucketTallyCollector {
|
func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, projectAccountingDB accounting.ProjectAccounting, config Config) *BucketTallyCollector {
|
||||||
return &BucketTallyCollector{
|
return &BucketTallyCollector{
|
||||||
Now: now,
|
Now: now,
|
||||||
Log: log,
|
Log: log,
|
||||||
Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally),
|
Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally),
|
||||||
|
|
||||||
metabase: db,
|
metabase: db,
|
||||||
bucketsDB: bucketsDB,
|
bucketsDB: bucketsDB,
|
||||||
config: config,
|
projectAccountingDB: projectAccountingDB,
|
||||||
|
config: config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,9 +284,24 @@ func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, sta
|
|||||||
var lastBucketLocation metabase.BucketLocation
|
var lastBucketLocation metabase.BucketLocation
|
||||||
for {
|
for {
|
||||||
more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
more, err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
|
||||||
|
fromBucket := bucketLocations[0]
|
||||||
|
toBucket := bucketLocations[len(bucketLocations)-1]
|
||||||
|
|
||||||
|
// Prepopulate the results with empty tallies. Otherwise, empty buckets will be unaccounted for
|
||||||
|
// since they're not reached when iterating over objects in the metainfo DB.
|
||||||
|
// We only do this for buckets whose last tally is non-empty because only one empty tally is
|
||||||
|
// required for us to know that a bucket was empty the last time we checked.
|
||||||
|
locs, err := observer.projectAccountingDB.GetNonEmptyTallyBucketsInRange(ctx, fromBucket, toBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, loc := range locs {
|
||||||
|
observer.Bucket[loc] = &accounting.BucketTally{BucketLocation: loc}
|
||||||
|
}
|
||||||
|
|
||||||
tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{
|
tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{
|
||||||
From: bucketLocations[0],
|
From: fromBucket,
|
||||||
To: bucketLocations[len(bucketLocations)-1],
|
To: toBucket,
|
||||||
AsOfSystemTime: startingTime,
|
AsOfSystemTime: startingTime,
|
||||||
AsOfSystemInterval: observer.config.AsOfSystemInterval,
|
AsOfSystemInterval: observer.config.AsOfSystemInterval,
|
||||||
Now: observer.Now,
|
Now: observer.Now,
|
||||||
|
@ -98,7 +98,14 @@ func TestOnlyInline(t *testing.T) {
|
|||||||
|
|
||||||
// run multiple times to ensure we add tallies
|
// run multiple times to ensure we add tallies
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
|
collector := tally.NewBucketTallyCollector(
|
||||||
|
planet.Satellites[0].Log.Named("bucket tally"),
|
||||||
|
time.Now(),
|
||||||
|
planet.Satellites[0].Metabase.DB,
|
||||||
|
planet.Satellites[0].DB.Buckets(),
|
||||||
|
planet.Satellites[0].DB.ProjectAccounting(),
|
||||||
|
planet.Satellites[0].Config.Tally,
|
||||||
|
)
|
||||||
err := collector.Run(ctx)
|
err := collector.Run(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -173,7 +180,14 @@ func TestCalculateBucketAtRestData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
require.Len(t, expectedTotal, 3)
|
require.Len(t, expectedTotal, 3)
|
||||||
|
|
||||||
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
|
collector := tally.NewBucketTallyCollector(
|
||||||
|
satellite.Log.Named("bucket tally"),
|
||||||
|
time.Now(),
|
||||||
|
satellite.Metabase.DB,
|
||||||
|
planet.Satellites[0].DB.Buckets(),
|
||||||
|
planet.Satellites[0].DB.ProjectAccounting(),
|
||||||
|
planet.Satellites[0].Config.Tally,
|
||||||
|
)
|
||||||
err = collector.Run(ctx)
|
err = collector.Run(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedTotal, collector.Bucket)
|
require.Equal(t, expectedTotal, collector.Bucket)
|
||||||
@ -186,16 +200,31 @@ func TestIgnoresExpiredPointers(t *testing.T) {
|
|||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
|
|
||||||
|
const bucketName = "bucket"
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour))
|
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
|
collector := tally.NewBucketTallyCollector(
|
||||||
|
satellite.Log.Named("bucket tally"),
|
||||||
|
now.Add(24*time.Hour),
|
||||||
|
satellite.Metabase.DB,
|
||||||
|
planet.Satellites[0].DB.Buckets(),
|
||||||
|
planet.Satellites[0].DB.ProjectAccounting(),
|
||||||
|
planet.Satellites[0].Config.Tally,
|
||||||
|
)
|
||||||
err = collector.Run(ctx)
|
err = collector.Run(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// there should be no observed buckets because all of the objects are expired
|
// there should be a single empty tally because all of the objects are expired
|
||||||
require.Equal(t, collector.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{})
|
loc := metabase.BucketLocation{
|
||||||
|
ProjectID: planet.Uplinks[0].Projects[0].ID,
|
||||||
|
BucketName: bucketName,
|
||||||
|
}
|
||||||
|
require.Equal(t, map[metabase.BucketLocation]*accounting.BucketTally{
|
||||||
|
loc: {BucketLocation: loc},
|
||||||
|
}, collector.Bucket)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -398,11 +427,18 @@ func TestTallyBatchSize(t *testing.T) {
|
|||||||
require.Len(t, objects, numberOfBuckets)
|
require.Len(t, objects, numberOfBuckets)
|
||||||
|
|
||||||
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} {
|
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} {
|
||||||
collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{
|
collector := tally.NewBucketTallyCollector(
|
||||||
Interval: 1 * time.Hour,
|
zaptest.NewLogger(t),
|
||||||
ListLimit: batchSize,
|
time.Now(),
|
||||||
AsOfSystemInterval: 1 * time.Microsecond,
|
planet.Satellites[0].Metabase.DB,
|
||||||
})
|
planet.Satellites[0].DB.Buckets(),
|
||||||
|
planet.Satellites[0].DB.ProjectAccounting(),
|
||||||
|
tally.Config{
|
||||||
|
Interval: 1 * time.Hour,
|
||||||
|
ListLimit: batchSize,
|
||||||
|
AsOfSystemInterval: 1 * time.Microsecond,
|
||||||
|
},
|
||||||
|
)
|
||||||
err := collector.Run(ctx)
|
err := collector.Run(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"storj.io/private/dbutil/pgutil"
|
"storj.io/private/dbutil/pgutil"
|
||||||
"storj.io/private/dbutil/pgutil/pgerrcode"
|
"storj.io/private/dbutil/pgutil/pgerrcode"
|
||||||
"storj.io/private/dbutil/pgxutil"
|
"storj.io/private/dbutil/pgxutil"
|
||||||
|
"storj.io/private/tagsql"
|
||||||
"storj.io/storj/satellite/accounting"
|
"storj.io/storj/satellite/accounting"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
@ -144,6 +145,39 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou
|
|||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range
|
||||||
|
// whose most recent tally does not represent empty usage.
|
||||||
|
func (db *ProjectAccounting) GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) (result []metabase.BucketLocation, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
err = withRows(db.db.QueryContext(ctx, `
|
||||||
|
SELECT project_id, name
|
||||||
|
FROM bucket_metainfos bm
|
||||||
|
WHERE (project_id, name) BETWEEN ($1, $2) AND ($3, $4)
|
||||||
|
AND NOT 0 IN (
|
||||||
|
SELECT object_count FROM bucket_storage_tallies
|
||||||
|
WHERE (project_id, bucket_name) = (bm.project_id, bm.name)
|
||||||
|
ORDER BY interval_start DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
`, from.ProjectID, from.BucketName, to.ProjectID, to.BucketName),
|
||||||
|
)(func(r tagsql.Rows) error {
|
||||||
|
for r.Next() {
|
||||||
|
loc := metabase.BucketLocation{}
|
||||||
|
if err := r.Scan(&loc.ProjectID, &loc.BucketName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
result = append(result, loc)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
||||||
func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) {
|
func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
@ -452,3 +452,49 @@ func Test_GetProjectObjectsSegments(t *testing.T) {
|
|||||||
require.Zero(t, projectStats.SegmentCount)
|
require.Zero(t, projectStats.SegmentCount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProjectUsageGap(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
sat := planet.Satellites[0]
|
||||||
|
uplink := planet.Uplinks[0]
|
||||||
|
tally := sat.Accounting.Tally
|
||||||
|
|
||||||
|
tally.Loop.Pause()
|
||||||
|
|
||||||
|
now := time.Time{}
|
||||||
|
tally.SetNow(func() time.Time {
|
||||||
|
return now
|
||||||
|
})
|
||||||
|
|
||||||
|
const (
|
||||||
|
bucketName = "testbucket"
|
||||||
|
objectPath = "test/path"
|
||||||
|
)
|
||||||
|
|
||||||
|
data := testrand.Bytes(10)
|
||||||
|
require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data))
|
||||||
|
tally.Loop.TriggerWait()
|
||||||
|
|
||||||
|
objs, err := sat.Metabase.DB.TestingAllObjects(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, objs, 1)
|
||||||
|
expectedStorage := objs[0].TotalEncryptedSize
|
||||||
|
|
||||||
|
now = now.Add(time.Hour)
|
||||||
|
require.NoError(t, uplink.DeleteObject(ctx, sat, bucketName, objectPath))
|
||||||
|
tally.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// This object is only uploaded and tallied so that the usage calculator knows
|
||||||
|
// how long it's been since the previous tally.
|
||||||
|
now = now.Add(time.Hour)
|
||||||
|
require.NoError(t, uplink.Upload(ctx, sat, bucketName, objectPath, data))
|
||||||
|
tally.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// The bucket was full for only 1 hour, so expect `expectedStorage` byte-hours of storage usage.
|
||||||
|
usage, err := sat.DB.ProjectAccounting().GetProjectTotal(ctx, uplink.Projects[0].ID, time.Time{}, now.Add(time.Second))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, expectedStorage, usage.Storage)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user