satellite/metabase: added CollectBucketTallies
One of two parts to stop using objects loop for bucket accounting, this method collects bucket tallies from list of bucket locations part1 of: https://github.com/storj/team-metainfo/issues/125 Change-Id: Id2d492582453e28463cddf1245622fb7f191050c
This commit is contained in:
parent
22c0b0ac5c
commit
fa287b8206
@ -414,3 +414,74 @@ func maxUUID() (uuid.UUID, error) {
|
||||
maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
|
||||
return maxUUID, err
|
||||
}
|
||||
|
||||
// BucketTally contains information about aggregate data stored in a bucket.
|
||||
type BucketTally struct {
|
||||
BucketLocation
|
||||
|
||||
ObjectCount int64
|
||||
|
||||
TotalSegments int64
|
||||
TotalBytes int64
|
||||
|
||||
MetadataSize int64
|
||||
}
|
||||
|
||||
// CollectBucketTallies contains arguments necessary for looping through objects in metabase.
|
||||
type CollectBucketTallies struct {
|
||||
From BucketLocation
|
||||
To BucketLocation
|
||||
AsOfSystemTime time.Time
|
||||
AsOfSystemInterval time.Duration
|
||||
}
|
||||
|
||||
// Verify verifies CollectBucketTallies request fields.
|
||||
func (opts *CollectBucketTallies) Verify() error {
|
||||
if opts.To.ProjectID.Less(opts.From.ProjectID) {
|
||||
return ErrInvalidRequest.New("project ID To is before project ID From")
|
||||
}
|
||||
if opts.To.ProjectID == opts.From.ProjectID && opts.To.BucketName < opts.From.BucketName {
|
||||
return ErrInvalidRequest.New("bucket name To is before bucket name From")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CollectBucketTallies collect limited bucket tallies from given bucket locations.
|
||||
func (db *DB) CollectBucketTallies(ctx context.Context, opts CollectBucketTallies) (result []BucketTally, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return []BucketTally{}, err
|
||||
}
|
||||
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT project_id, bucket_name, SUM(total_encrypted_size), SUM(segment_count), COALESCE(SUM(length(encrypted_metadata)), 0), count(*)
|
||||
FROM objects
|
||||
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
|
||||
WHERE (project_id, bucket_name) BETWEEN ($1, $2) AND ($3, $4) AND
|
||||
(expires_at IS NULL OR expires_at > now())
|
||||
GROUP BY (project_id, bucket_name)
|
||||
ORDER BY (project_id, bucket_name) ASC
|
||||
`, opts.From.ProjectID, opts.From.BucketName, opts.To.ProjectID, opts.To.BucketName))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var bucketTally BucketTally
|
||||
|
||||
if err = rows.Scan(
|
||||
&bucketTally.ProjectID, &bucketTally.BucketName,
|
||||
&bucketTally.TotalBytes, &bucketTally.TotalSegments,
|
||||
&bucketTally.MetadataSize, &bucketTally.ObjectCount,
|
||||
); err != nil {
|
||||
return Error.New("unable to query bucket tally: %w", err)
|
||||
}
|
||||
|
||||
result = append(result, bucketTally)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return []BucketTally{}, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
@ -726,3 +726,261 @@ func loopPendingObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntr
|
||||
SegmentCount: m.SegmentCount,
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBucketTallies(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
t.Run("empty from", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
To: metabase.BucketLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "name does not exist 2",
|
||||
},
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("empty to", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: metabase.BucketLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "name does not exist",
|
||||
},
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "project ID To is before project ID From",
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("empty bucket", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
randStream := metabasetest.RandObjectStream()
|
||||
|
||||
obj := metabasetest.CreateObject(ctx, t, db, metabase.ObjectStream{
|
||||
ProjectID: randStream.ProjectID,
|
||||
BucketName: randStream.BucketName,
|
||||
ObjectKey: randStream.ObjectKey,
|
||||
Version: randStream.Version,
|
||||
StreamID: randStream.StreamID,
|
||||
}, 0)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
Version: randStream.Version,
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: randStream.ProjectID,
|
||||
BucketName: randStream.BucketName,
|
||||
ObjectKey: randStream.ObjectKey,
|
||||
},
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{obj}},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
To: metabase.BucketLocation{
|
||||
ProjectID: randStream.ProjectID,
|
||||
BucketName: randStream.BucketName,
|
||||
},
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("empty request", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: metabase.BucketLocation{},
|
||||
To: metabase.BucketLocation{},
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("pending and committed", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
pending := metabasetest.RandObjectStream()
|
||||
committed := metabasetest.RandObjectStream()
|
||||
committed.ProjectID = pending.ProjectID
|
||||
committed.BucketName = pending.BucketName + "q"
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: pending,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, committed, 1)
|
||||
|
||||
expected := []metabase.BucketTally{
|
||||
{
|
||||
BucketLocation: metabase.BucketLocation{
|
||||
ProjectID: pending.ProjectID,
|
||||
BucketName: pending.BucketName,
|
||||
},
|
||||
ObjectCount: 1,
|
||||
TotalSegments: 0,
|
||||
TotalBytes: 0,
|
||||
MetadataSize: 1024,
|
||||
},
|
||||
{
|
||||
BucketLocation: metabase.BucketLocation{
|
||||
ProjectID: committed.ProjectID,
|
||||
BucketName: committed.BucketName,
|
||||
},
|
||||
ObjectCount: 1,
|
||||
TotalSegments: 1,
|
||||
TotalBytes: 1024,
|
||||
MetadataSize: 0,
|
||||
},
|
||||
}
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: metabase.BucketLocation{
|
||||
ProjectID: pending.ProjectID,
|
||||
BucketName: pending.BucketName,
|
||||
},
|
||||
To: metabase.BucketLocation{
|
||||
ProjectID: committed.ProjectID,
|
||||
BucketName: committed.BucketName,
|
||||
},
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: metabase.BucketLocation{
|
||||
ProjectID: pending.ProjectID,
|
||||
BucketName: pending.BucketName,
|
||||
},
|
||||
To: metabase.BucketLocation{
|
||||
ProjectID: committed.ProjectID,
|
||||
BucketName: committed.BucketName,
|
||||
},
|
||||
AsOfSystemTime: time.Now(),
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("multiple projects", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
projects := []uuid.UUID{}
|
||||
for i := 0; i < 10; i++ {
|
||||
p := testrand.UUID()
|
||||
p[0] = byte(i)
|
||||
projects = append(projects, p)
|
||||
}
|
||||
bucketNames := strings.Split("abcde", "")
|
||||
bucketLocations := make([]metabase.BucketLocation, 0, len(projects)*len(bucketNames))
|
||||
|
||||
expected := make([]metabase.BucketTally, 0, len(projects)*len(bucketNames))
|
||||
for _, projectID := range projects {
|
||||
for _, bucketName := range bucketNames {
|
||||
bucketLocations = append(bucketLocations, metabase.BucketLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
})
|
||||
rawObjects := createObjects(ctx, t, db, 1, projectID, bucketName)
|
||||
for _, obj := range rawObjects {
|
||||
expected = append(expected, bucketTallyFromRaw(obj))
|
||||
}
|
||||
}
|
||||
}
|
||||
sortBucketLocations(bucketLocations)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: bucketLocations[0],
|
||||
To: bucketLocations[len(bucketLocations)-1],
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: bucketLocations[0],
|
||||
To: bucketLocations[len(bucketLocations)-1],
|
||||
AsOfSystemTime: time.Now(),
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: bucketLocations[0],
|
||||
To: bucketLocations[15],
|
||||
AsOfSystemTime: time.Now(),
|
||||
},
|
||||
Result: expected[0:16],
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: bucketLocations[16],
|
||||
To: bucketLocations[34],
|
||||
AsOfSystemTime: time.Now(),
|
||||
},
|
||||
Result: expected[16:35],
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CollectBucketTallies{
|
||||
Opts: metabase.CollectBucketTallies{
|
||||
From: bucketLocations[30],
|
||||
To: bucketLocations[10],
|
||||
AsOfSystemTime: time.Now(),
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "project ID To is before project ID From",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func bucketTallyFromRaw(m metabase.RawObject) metabase.BucketTally {
|
||||
return metabase.BucketTally{
|
||||
BucketLocation: metabase.BucketLocation{
|
||||
ProjectID: m.ProjectID,
|
||||
BucketName: m.BucketName,
|
||||
},
|
||||
ObjectCount: 1,
|
||||
TotalSegments: int64(m.SegmentCount),
|
||||
TotalBytes: m.TotalEncryptedSize,
|
||||
MetadataSize: int64(len(m.EncryptedMetadata)),
|
||||
}
|
||||
}
|
||||
|
||||
func sortBucketLocations(bc []metabase.BucketLocation) {
|
||||
sort.Slice(bc, func(i, j int) bool {
|
||||
if bc[i].ProjectID == bc[j].ProjectID {
|
||||
return bc[i].BucketName < bc[j].BucketName
|
||||
}
|
||||
return bc[i].ProjectID.Less(bc[j].ProjectID)
|
||||
})
|
||||
}
|
||||
|
@ -54,6 +54,15 @@ func sortObjects(objects []metabase.Object) {
|
||||
})
|
||||
}
|
||||
|
||||
func sortBucketTallies(tallies []metabase.BucketTally) {
|
||||
sort.Slice(tallies, func(i, j int) bool {
|
||||
if tallies[i].ProjectID == tallies[j].ProjectID {
|
||||
return tallies[i].BucketName < tallies[j].BucketName
|
||||
}
|
||||
return tallies[i].ProjectID.Less(tallies[j].ProjectID)
|
||||
})
|
||||
}
|
||||
|
||||
func sortRawObjects(objects []metabase.RawObject) {
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].StreamID.Less(objects[j].StreamID)
|
||||
|
@ -748,3 +748,23 @@ func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.
|
||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// CollectBucketTallies is for testing metabase.CollectBucketTallies.
|
||||
type CollectBucketTallies struct {
|
||||
Opts metabase.CollectBucketTallies
|
||||
Result []metabase.BucketTally
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step CollectBucketTallies) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
result, err := db.CollectBucketTallies(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
sortBucketTallies(result)
|
||||
sortBucketTallies(step.Result)
|
||||
|
||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user