satellite/metainfo/metabase: iterateObjectsAllVersions

Iterate through all objects in a bucket
recursive, no prefix support

Change-Id: Ieaad9fbd0b5c1593554db9d543f4ee04851bac28
This commit is contained in:
Fadila Khadar 2020-10-29 19:10:46 +01:00
parent 8182d8a726
commit a749ac9f47
6 changed files with 407 additions and 2 deletions

View File

@ -24,6 +24,7 @@ const (
)
const maxListLimit = 1000
const batchsizeLimit = 1000
// BucketPrefix consists of <project id>/<bucket name>.
type BucketPrefix string

View File

@ -0,0 +1,126 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"github.com/zeebo/errs"
"storj.io/storj/private/tagsql"
)
// objectIterator enables iteration on objects in a bucket.
type objectsIterator struct {
opts *IterateObjects
db *DB
batchSize int
curIndex int
curRows tagsql.Rows
status ObjectStatus
cursor IterateCursor
}
func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
it := &objectsIterator{
db: db,
opts: &opts,
batchSize: opts.BatchSize,
curIndex: 0,
status: Committed,
cursor: opts.Cursor,
}
it.curRows, err = it.doNextQuery(ctx)
if err != nil {
return err
}
defer func() {
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.curRows.Close())
}()
return fn(ctx, it)
}
// Next returns true if there was another item and copy it in item.
func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
next := it.curRows.Next()
if !next {
if it.curIndex < it.batchSize {
return false
}
if it.curRows.Err() != nil {
return false
}
rows, err := it.doNextQuery(ctx)
if err != nil {
return false
}
if it.curRows.Close() != nil {
_ = rows.Close()
return false
}
it.curRows = rows
it.curIndex = 0
if !it.curRows.Next() {
return false
}
}
err := it.scanItem(item)
if err != nil {
return false
}
item.ProjectID = it.opts.ProjectID
item.BucketName = it.opts.BucketName
it.curIndex++
it.cursor.Key = item.ObjectKey
it.cursor.Version = item.Version
return true
}
func (it *objectsIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
return it.db.db.Query(ctx, `
SELECT
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
AND status = $3
AND (object_key, version) > ($4, $5)
ORDER BY object_key ASC, version ASC
LIMIT $6
`, it.opts.ProjectID, it.opts.BucketName, it.status, []byte(it.cursor.Key), int(it.cursor.Version), it.opts.BatchSize)
}
func (it *objectsIterator) scanItem(item *ObjectEntry) error {
return it.curRows.Scan(
&item.ObjectKey, &item.StreamID, &item.Version, &item.Status,
&item.CreatedAt, &item.ExpiresAt,
&item.SegmentCount,
&item.EncryptedMetadataNonce, &item.EncryptedMetadata,
&item.TotalEncryptedSize, &item.FixedSegmentSize,
encryptionParameters{&item.Encryption},
)
}

View File

@ -0,0 +1,62 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"storj.io/common/uuid"
)
// ObjectEntry contains information about an item in a bucket.
type ObjectEntry Object
// ObjectsIterator iterates over a sequence of ObjectEntry items.
type ObjectsIterator interface {
Next(ctx context.Context, item *ObjectEntry) bool
}
// IterateCursor is a cursor used during iteration.
type IterateCursor struct {
Key ObjectKey
Version Version
}
// IterateObjects contains arguments necessary for listing objects in a bucket.
type IterateObjects struct {
ProjectID uuid.UUID
BucketName string
Recursive bool
BatchSize int
Prefix ObjectKey
Cursor IterateCursor
}
// IterateObjectsAllVersions iterates through all versions of all committed objects.
func (db *DB) IterateObjectsAllVersions(ctx context.Context, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if err = opts.Verify(); err != nil {
return err
}
return iterateAllVersions(ctx, db, opts, fn)
}
// Verify verifies get object request fields.
func (opts *IterateObjects) Verify() error {
switch {
case opts.BucketName == "":
return ErrInvalidRequest.New("BucketName missing")
case opts.ProjectID.IsZero():
return ErrInvalidRequest.New("ProjectID missing")
case !opts.Recursive:
return ErrInvalidRequest.New("non-recursive listing not implemented yet")
case opts.Prefix != "":
return ErrInvalidRequest.New("prefixed listing not implemented yet")
case opts.BatchSize < 0:
return ErrInvalidRequest.New("BatchSize is negative")
case opts.BatchSize == 0 || opts.BatchSize > batchsizeLimit:
opts.BatchSize = batchsizeLimit
}
return nil
}

View File

@ -0,0 +1,184 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase_test
import (
"sort"
"testing"
"time"
"storj.io/common/testcontext"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo/metabase"
)
func TestIterateObjects(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("BucketName missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "",
Recursive: true,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "BucketName missing",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("ProjectID missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{},
BucketName: "sj://mybucket",
Recursive: true,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "ProjectID missing",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("Limit is negative", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
BatchSize: -1,
Recursive: true,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "BatchSize is negative",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("List empty bucket", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
objects := createObjects(ctx, t, db, 2, uuid.UUID{1}, "mybucket")
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "myemptybucket",
BatchSize: 10,
Recursive: true,
},
Result: nil,
}.Check(ctx, t, db)
Verify{Objects: objects}.Check(ctx, t, db)
})
t.Run("List less objects than limit", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjects := 3
limit := 10
expected := make([]metabase.ObjectEntry, numberOfObjects)
objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket")
for i, obj := range objects {
expected[i] = metabase.ObjectEntry(obj)
}
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: objects}.Check(ctx, t, db)
})
t.Run("List more objects than limit", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjects := 10
limit := 3
expected := make([]metabase.ObjectEntry, numberOfObjects)
objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket")
for i, obj := range objects {
expected[i] = metabase.ObjectEntry(obj)
}
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: objects}.Check(ctx, t, db)
})
t.Run("List objects in one bucket in project with 2 buckets", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5
batchSize := 10
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a")
objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b")
for i, obj := range objectsBucketA {
expected[i] = metabase.ObjectEntry(obj)
}
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "bucket-a",
Recursive: true,
BatchSize: batchSize,
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: append(objectsBucketA, objectsBucketB...)}.Check(ctx, t, db)
})
t.Run("List objects in one bucket with same bucketName in another project", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5
batchSize := 10
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket")
objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket")
for i, obj := range objectsProject1 {
expected[i] = metabase.ObjectEntry(obj)
}
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: batchSize,
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: append(objectsProject1, objectsProject2...)}.Check(ctx, t, db)
})
})
}
func createObjects(ctx *testcontext.Context, t *testing.T, db *metabase.DB, numberOfObjects int, projectID uuid.UUID, bucketName string) []metabase.RawObject {
objects := make([]metabase.RawObject, numberOfObjects)
for i := 0; i < numberOfObjects; i++ {
obj := randObjectStream()
obj.ProjectID = projectID
obj.BucketName = bucketName
now := time.Now()
createObject(ctx, t, db, obj, 0)
objects[i] = metabase.RawObject{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: defaultTestEncryption,
}
}
sort.SliceStable(objects, func(i, j int) bool {
return objects[i].ObjectKey < objects[j].ObjectKey
})
return objects
}

View File

@ -94,7 +94,7 @@ func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err erro
objs := []RawObject{}
rows, err := db.db.Query(ctx, `
SELECT
SELECT
project_id, bucket_name, object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
@ -103,6 +103,7 @@ func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err erro
encryption,
zombie_deletion_deadline
FROM objects
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
`)
if err != nil {
return nil, Error.New("testingGetAllObjects query: %w", err)
@ -152,7 +153,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
segs := []RawSegment{}
rows, err := db.db.Query(ctx, `
SELECT
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size,
@ -160,6 +161,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
redundancy,
inline_data, remote_pieces
FROM segments
ORDER BY stream_id ASC, position ASC
`)
if err != nil {
return nil, Error.New("testingGetAllSegments query: %w", err)

View File

@ -4,6 +4,7 @@
package metabase_test
import (
"context"
"sort"
"testing"
"time"
@ -257,6 +258,35 @@ func (step DeleteObjectsAllVersions) Check(ctx *testcontext.Context, t *testing.
require.Zero(t, diff)
}
type IterateCollector []metabase.ObjectEntry
func (coll *IterateCollector) Add(ctx context.Context, it metabase.ObjectsIterator) error {
var item metabase.ObjectEntry
for it.Next(ctx, &item) {
*coll = append(*coll, item)
}
return nil
}
type IterateObjects struct {
Opts metabase.IterateObjects
Result []metabase.ObjectEntry
ErrClass *errs.Class
ErrText string
}
func (step IterateObjects) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
var result IterateCollector
err := db.IterateObjectsAllVersions(ctx, step.Opts, result.Add)
checkError(t, err, step.ErrClass, step.ErrText)
diff := cmp.Diff(step.Result, []metabase.ObjectEntry(result), cmpopts.EquateApproxTime(5*time.Second))
require.Zero(t, diff)
}
func checkError(t *testing.T, err error, errClass *errs.Class, errText string) {
if errClass != nil {
require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err)