satellite/metabase: move CollectBucketTallies

Change-Id: Ia7a4bac91b02c006513f3cf9b9266053d60e90e4
This commit is contained in:
Egon Elbre 2023-10-17 19:49:12 +03:00
parent d98498d17f
commit 25c4e4eec1
4 changed files with 390 additions and 362 deletions

View File

@ -0,0 +1,92 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"time"
"storj.io/private/tagsql"
)
// BucketTally contains information about aggregate data stored in a bucket.
type BucketTally struct {
BucketLocation
ObjectCount int64
PendingObjectCount int64
TotalSegments int64
TotalBytes int64
MetadataSize int64
}
// CollectBucketTallies contains arguments necessary for looping through objects in metabase.
type CollectBucketTallies struct {
From BucketLocation
To BucketLocation
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
Now time.Time
}
// Verify verifies CollectBucketTallies request fields.
func (opts *CollectBucketTallies) Verify() error {
if opts.To.ProjectID.Less(opts.From.ProjectID) {
return ErrInvalidRequest.New("project ID To is before project ID From")
}
if opts.To.ProjectID == opts.From.ProjectID && opts.To.BucketName < opts.From.BucketName {
return ErrInvalidRequest.New("bucket name To is before bucket name From")
}
return nil
}
// CollectBucketTallies collect limited bucket tallies from given bucket locations.
func (db *DB) CollectBucketTallies(ctx context.Context, opts CollectBucketTallies) (result []BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return []BucketTally{}, err
}
if opts.Now.IsZero() {
opts.Now = time.Now()
}
err = withRows(db.db.QueryContext(ctx, `
SELECT
project_id, bucket_name,
SUM(total_encrypted_size), SUM(segment_count), COALESCE(SUM(length(encrypted_metadata)), 0),
count(*), count(*) FILTER (WHERE status = `+statusPending+`)
FROM objects
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
WHERE (project_id, bucket_name) BETWEEN ($1, $2) AND ($3, $4) AND
(expires_at IS NULL OR expires_at > $5)
GROUP BY (project_id, bucket_name)
ORDER BY (project_id, bucket_name) ASC
`, opts.From.ProjectID, []byte(opts.From.BucketName), opts.To.ProjectID, []byte(opts.To.BucketName), opts.Now))(func(rows tagsql.Rows) error {
for rows.Next() {
var bucketTally BucketTally
if err = rows.Scan(
&bucketTally.ProjectID, &bucketTally.BucketName,
&bucketTally.TotalBytes, &bucketTally.TotalSegments,
&bucketTally.MetadataSize, &bucketTally.ObjectCount,
&bucketTally.PendingObjectCount,
); err != nil {
return Error.New("unable to query bucket tally: %w", err)
}
result = append(result, bucketTally)
}
return nil
})
if err != nil {
return []BucketTally{}, err
}
return result, nil
}

View File

@ -0,0 +1,298 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase_test
import (
"sort"
"strings"
"testing"
"time"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
func TestCollectBucketTallies(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("empty from", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
To: metabase.BucketLocation{
ProjectID: testrand.UUID(),
BucketName: "name does not exist 2",
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty to", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: testrand.UUID(),
BucketName: "name does not exist",
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "project ID To is before project ID From",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty bucket", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
randStream := metabasetest.RandObjectStream()
obj := metabasetest.CreateObject(ctx, t, db, metabase.ObjectStream{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
ObjectKey: randStream.ObjectKey,
Version: randStream.Version,
StreamID: randStream.StreamID,
}, 0)
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
Version: randStream.Version,
ObjectLocation: metabase.ObjectLocation{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
ObjectKey: randStream.ObjectKey,
},
},
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{obj}},
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
To: metabase.BucketLocation{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty request", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{},
To: metabase.BucketLocation{},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("invalid bucket name", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
projectA := uuid.UUID{1}
projectB := uuid.UUID{2}
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: projectA,
BucketName: "a\\",
},
To: metabase.BucketLocation{
ProjectID: projectB,
BucketName: "b\\",
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("pending and committed", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := metabasetest.RandObjectStream()
committed := metabasetest.RandObjectStream()
committed.ProjectID = pending.ProjectID
committed.BucketName = pending.BucketName + "q"
encryptedMetadata := testrand.Bytes(1024)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataKey := testrand.Bytes(265)
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: pending,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
}.Check(ctx, t, db)
metabasetest.CreateObject(ctx, t, db, committed, 1)
expected := []metabase.BucketTally{
{
BucketLocation: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
ObjectCount: 1,
PendingObjectCount: 1,
TotalSegments: 0,
TotalBytes: 0,
MetadataSize: 1024,
},
{
BucketLocation: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
ObjectCount: 1,
PendingObjectCount: 0,
TotalSegments: 1,
TotalBytes: 1024,
MetadataSize: 0,
},
}
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
To: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
To: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
t.Run("multiple projects", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
projects := []uuid.UUID{}
for i := 0; i < 10; i++ {
p := testrand.UUID()
p[0] = byte(i)
projects = append(projects, p)
}
bucketNames := strings.Split("abcde", "")
bucketLocations := make([]metabase.BucketLocation, 0, len(projects)*len(bucketNames))
expected := make([]metabase.BucketTally, 0, len(projects)*len(bucketNames))
for _, projectID := range projects {
for _, bucketName := range bucketNames {
bucketLocations = append(bucketLocations, metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
rawObjects := createObjects(ctx, t, db, 1, projectID, bucketName)
for _, obj := range rawObjects {
expected = append(expected, bucketTallyFromRaw(obj))
}
}
}
sortBucketLocations(bucketLocations)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[len(bucketLocations)-1],
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[len(bucketLocations)-1],
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[15],
AsOfSystemTime: time.Now(),
},
Result: expected[0:16],
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[16],
To: bucketLocations[34],
AsOfSystemTime: time.Now(),
},
Result: expected[16:35],
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[30],
To: bucketLocations[10],
AsOfSystemTime: time.Now(),
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "project ID To is before project ID From",
}.Check(ctx, t, db)
})
})
}
func bucketTallyFromRaw(m metabase.RawObject) metabase.BucketTally {
return metabase.BucketTally{
BucketLocation: metabase.BucketLocation{
ProjectID: m.ProjectID,
BucketName: m.BucketName,
},
ObjectCount: 1,
TotalSegments: int64(m.SegmentCount),
TotalBytes: m.TotalEncryptedSize,
MetadataSize: int64(len(m.EncryptedMetadata)),
}
}
func sortBucketLocations(bc []metabase.BucketLocation) {
sort.Slice(bc, func(i, j int) bool {
if bc[i].ProjectID == bc[j].ProjectID {
return bc[i].BucketName < bc[j].BucketName
}
return bc[i].ProjectID.Less(bc[j].ProjectID)
})
}

View File

@ -417,84 +417,3 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn
return nil
}
// BucketTally contains information about aggregate data stored in a bucket.
type BucketTally struct {
BucketLocation
ObjectCount int64
PendingObjectCount int64
TotalSegments int64
TotalBytes int64
MetadataSize int64
}
// CollectBucketTallies contains arguments necessary for looping through objects in metabase.
type CollectBucketTallies struct {
From BucketLocation
To BucketLocation
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
Now time.Time
}
// Verify verifies CollectBucketTallies request fields.
func (opts *CollectBucketTallies) Verify() error {
if opts.To.ProjectID.Less(opts.From.ProjectID) {
return ErrInvalidRequest.New("project ID To is before project ID From")
}
if opts.To.ProjectID == opts.From.ProjectID && opts.To.BucketName < opts.From.BucketName {
return ErrInvalidRequest.New("bucket name To is before bucket name From")
}
return nil
}
// CollectBucketTallies collect limited bucket tallies from given bucket locations.
func (db *DB) CollectBucketTallies(ctx context.Context, opts CollectBucketTallies) (result []BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return []BucketTally{}, err
}
if opts.Now.IsZero() {
opts.Now = time.Now()
}
err = withRows(db.db.QueryContext(ctx, `
SELECT
project_id, bucket_name,
SUM(total_encrypted_size), SUM(segment_count), COALESCE(SUM(length(encrypted_metadata)), 0),
count(*), count(*) FILTER (WHERE status = 1)
FROM objects
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
WHERE (project_id, bucket_name) BETWEEN ($1, $2) AND ($3, $4) AND
(expires_at IS NULL OR expires_at > $5)
GROUP BY (project_id, bucket_name)
ORDER BY (project_id, bucket_name) ASC
`, opts.From.ProjectID, []byte(opts.From.BucketName), opts.To.ProjectID, []byte(opts.To.BucketName), opts.Now))(func(rows tagsql.Rows) error {
for rows.Next() {
var bucketTally BucketTally
if err = rows.Scan(
&bucketTally.ProjectID, &bucketTally.BucketName,
&bucketTally.TotalBytes, &bucketTally.TotalSegments,
&bucketTally.MetadataSize, &bucketTally.ObjectCount,
&bucketTally.PendingObjectCount,
); err != nil {
return Error.New("unable to query bucket tally: %w", err)
}
result = append(result, bucketTally)
}
return nil
})
if err != nil {
return []BucketTally{}, err
}
return result, nil
}

View File

@ -733,284 +733,3 @@ func loopPendingObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntr
SegmentCount: m.SegmentCount,
}
}
func TestCollectBucketTallies(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("empty from", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
To: metabase.BucketLocation{
ProjectID: testrand.UUID(),
BucketName: "name does not exist 2",
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty to", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: testrand.UUID(),
BucketName: "name does not exist",
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "project ID To is before project ID From",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty bucket", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
randStream := metabasetest.RandObjectStream()
obj := metabasetest.CreateObject(ctx, t, db, metabase.ObjectStream{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
ObjectKey: randStream.ObjectKey,
Version: randStream.Version,
StreamID: randStream.StreamID,
}, 0)
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
Version: randStream.Version,
ObjectLocation: metabase.ObjectLocation{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
ObjectKey: randStream.ObjectKey,
},
},
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{obj}},
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
To: metabase.BucketLocation{
ProjectID: randStream.ProjectID,
BucketName: randStream.BucketName,
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("empty request", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{},
To: metabase.BucketLocation{},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("invalid bucket name", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
projectA := uuid.UUID{1}
projectB := uuid.UUID{2}
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: projectA,
BucketName: "a\\",
},
To: metabase.BucketLocation{
ProjectID: projectB,
BucketName: "b\\",
},
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("pending and committed", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := metabasetest.RandObjectStream()
committed := metabasetest.RandObjectStream()
committed.ProjectID = pending.ProjectID
committed.BucketName = pending.BucketName + "q"
encryptedMetadata := testrand.Bytes(1024)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataKey := testrand.Bytes(265)
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: pending,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
}.Check(ctx, t, db)
metabasetest.CreateObject(ctx, t, db, committed, 1)
expected := []metabase.BucketTally{
{
BucketLocation: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
ObjectCount: 1,
PendingObjectCount: 1,
TotalSegments: 0,
TotalBytes: 0,
MetadataSize: 1024,
},
{
BucketLocation: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
ObjectCount: 1,
PendingObjectCount: 0,
TotalSegments: 1,
TotalBytes: 1024,
MetadataSize: 0,
},
}
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
To: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: metabase.BucketLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
},
To: metabase.BucketLocation{
ProjectID: committed.ProjectID,
BucketName: committed.BucketName,
},
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
t.Run("multiple projects", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
projects := []uuid.UUID{}
for i := 0; i < 10; i++ {
p := testrand.UUID()
p[0] = byte(i)
projects = append(projects, p)
}
bucketNames := strings.Split("abcde", "")
bucketLocations := make([]metabase.BucketLocation, 0, len(projects)*len(bucketNames))
expected := make([]metabase.BucketTally, 0, len(projects)*len(bucketNames))
for _, projectID := range projects {
for _, bucketName := range bucketNames {
bucketLocations = append(bucketLocations, metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
rawObjects := createObjects(ctx, t, db, 1, projectID, bucketName)
for _, obj := range rawObjects {
expected = append(expected, bucketTallyFromRaw(obj))
}
}
}
sortBucketLocations(bucketLocations)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[len(bucketLocations)-1],
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[len(bucketLocations)-1],
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[15],
AsOfSystemTime: time.Now(),
},
Result: expected[0:16],
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[16],
To: bucketLocations[34],
AsOfSystemTime: time.Now(),
},
Result: expected[16:35],
}.Check(ctx, t, db)
metabasetest.CollectBucketTallies{
Opts: metabase.CollectBucketTallies{
From: bucketLocations[30],
To: bucketLocations[10],
AsOfSystemTime: time.Now(),
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "project ID To is before project ID From",
}.Check(ctx, t, db)
})
})
}
func bucketTallyFromRaw(m metabase.RawObject) metabase.BucketTally {
return metabase.BucketTally{
BucketLocation: metabase.BucketLocation{
ProjectID: m.ProjectID,
BucketName: m.BucketName,
},
ObjectCount: 1,
TotalSegments: int64(m.SegmentCount),
TotalBytes: m.TotalEncryptedSize,
MetadataSize: int64(len(m.EncryptedMetadata)),
}
}
func sortBucketLocations(bc []metabase.BucketLocation) {
sort.Slice(bc, func(i, j int) bool {
if bc[i].ProjectID == bc[j].ProjectID {
return bc[i].BucketName < bc[j].BucketName
}
return bc[i].ProjectID.Less(bc[j].ProjectID)
})
}