satellite/metabase: drop IterateObjectsAllVersions
We are not using this method and most probably we won't need to list objects with all statuses at once. Removing for now. Change-Id: I7aa0468c5f635ee2fb1fe51db382595c6343dd9c
This commit is contained in:
parent
4725a3878c
commit
c934f45bfc
@ -214,9 +214,10 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, projectID := range s.projectID {
|
||||
m.Record(func() {
|
||||
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
|
||||
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket",
|
||||
Status: metabase.Committed,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
var entry metabase.ObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
@ -236,10 +237,11 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
for i := 0; i < b.N; i++ {
|
||||
for i, projectID := range s.projectID {
|
||||
m.Record(func() {
|
||||
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
|
||||
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket",
|
||||
Prefix: metabase.ObjectKey(prefixes[i]),
|
||||
Status: metabase.Committed,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
var entry metabase.ObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
|
@ -46,36 +46,6 @@ type iterateCursor struct {
|
||||
Inclusive bool
|
||||
}
|
||||
|
||||
func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
it := &objectsIterator{
|
||||
db: db,
|
||||
|
||||
projectID: opts.ProjectID,
|
||||
bucketName: []byte(opts.BucketName),
|
||||
prefix: opts.Prefix,
|
||||
prefixLimit: prefixLimit(opts.Prefix),
|
||||
batchSize: opts.BatchSize,
|
||||
recursive: true,
|
||||
includeCustomMetadata: true,
|
||||
includeSystemMetadata: true,
|
||||
|
||||
curIndex: 0,
|
||||
cursor: firstIterateCursor(true, opts.Cursor, opts.Prefix),
|
||||
doNextQuery: doNextQueryAllVersionsWithoutStatus,
|
||||
}
|
||||
|
||||
// start from either the cursor or prefix, depending on which is larger
|
||||
if lessKey(it.cursor.Key, opts.Prefix) {
|
||||
it.cursor.Key = opts.Prefix
|
||||
it.cursor.Version = -1
|
||||
it.cursor.Inclusive = true
|
||||
}
|
||||
|
||||
return iterate(ctx, it, fn)
|
||||
}
|
||||
|
||||
func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -251,59 +221,6 @@ func (it *objectsIterator) next(ctx context.Context, item *ObjectEntry) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
cursorCompare := ">"
|
||||
if it.cursor.Inclusive {
|
||||
cursorCompare = ">="
|
||||
}
|
||||
|
||||
if it.prefixLimit == "" {
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, encryption, status,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND bucket_name = $2
|
||||
AND (object_key, version) `+cursorCompare+` ($3, $4)
|
||||
ORDER BY object_key ASC, version ASC
|
||||
LIMIT $5
|
||||
`, it.projectID, it.bucketName,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
it.batchSize,
|
||||
)
|
||||
}
|
||||
|
||||
// TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it
|
||||
// works with CRDB.
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, encryption, status,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND bucket_name = $2
|
||||
AND (object_key, version) `+cursorCompare+` ($3, $4)
|
||||
AND object_key < $5
|
||||
ORDER BY object_key ASC, version ASC
|
||||
LIMIT $6
|
||||
`, it.projectID, it.bucketName,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
[]byte(it.prefixLimit),
|
||||
it.batchSize,
|
||||
|
||||
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
|
||||
)
|
||||
}
|
||||
|
||||
func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
|
@ -17,453 +17,6 @@ import (
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
)
|
||||
|
||||
func TestIterateObjects(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
t.Run("invalid arguments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
t.Run("ProjectID missing", func(t *testing.T) {
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{},
|
||||
BucketName: "sj://mybucket",
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ProjectID missing",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
t.Run("BucketName missing", func(t *testing.T) {
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "",
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BucketName missing",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
t.Run("Limit is negative", func(t *testing.T) {
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
BatchSize: -1,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BatchSize is negative",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("empty bucket", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
objects := createObjects(ctx, t, db, 2, uuid.UUID{1}, "mybucket")
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "myemptybucket",
|
||||
BatchSize: 10,
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: objects}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("pending and committed", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
pending := metabasetest.RandObjectStream()
|
||||
pending.ObjectKey = metabase.ObjectKey("firstObject")
|
||||
committed := metabasetest.RandObjectStream()
|
||||
committed.ProjectID = pending.ProjectID
|
||||
committed.BucketName = pending.BucketName
|
||||
committed.ObjectKey = metabase.ObjectKey("secondObject")
|
||||
|
||||
projectID := pending.ProjectID
|
||||
bucketName := pending.BucketName
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: pending,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: committed,
|
||||
OverrideEncryptedMetadata: true,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
{
|
||||
ObjectKey: pending.ObjectKey,
|
||||
Version: pending.Version,
|
||||
StreamID: pending.StreamID,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectKey: committed.ObjectKey,
|
||||
Version: committed.Version,
|
||||
StreamID: committed.StreamID,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("less objects than limit", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
numberOfObjects := 3
|
||||
limit := 10
|
||||
expected := make([]metabase.ObjectEntry, numberOfObjects)
|
||||
objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket")
|
||||
for i, obj := range objects {
|
||||
expected[i] = objectEntryFromRaw(obj)
|
||||
}
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
BatchSize: limit,
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: objects}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("more objects than limit", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
numberOfObjects := 10
|
||||
limit := 3
|
||||
expected := make([]metabase.ObjectEntry, numberOfObjects)
|
||||
objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket")
|
||||
for i, obj := range objects {
|
||||
expected[i] = objectEntryFromRaw(obj)
|
||||
}
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
BatchSize: limit,
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: objects}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("objects in one bucket in project with 2 buckets", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
numberOfObjectsPerBucket := 5
|
||||
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
|
||||
objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a")
|
||||
objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b")
|
||||
for i, obj := range objectsBucketA {
|
||||
expected[i] = objectEntryFromRaw(obj)
|
||||
}
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "bucket-a",
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: append(objectsBucketA, objectsBucketB...)}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("objects in one bucket with same bucketName in another project", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
numberOfObjectsPerBucket := 5
|
||||
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
|
||||
objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket")
|
||||
objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket")
|
||||
for i, obj := range objectsProject1 {
|
||||
expected[i] = objectEntryFromRaw(obj)
|
||||
}
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: append(objectsProject1, objectsProject2...)}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("recursive", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
projectID, bucketName := uuid.UUID{1}, "bucky"
|
||||
|
||||
objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
|
||||
"a",
|
||||
"b/1",
|
||||
"b/2",
|
||||
"b/3",
|
||||
"c",
|
||||
"c/",
|
||||
"c//",
|
||||
"c/1",
|
||||
"g",
|
||||
})
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
objects["a"],
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
objects["c"],
|
||||
objects["c/"],
|
||||
objects["c//"],
|
||||
objects["c/1"],
|
||||
objects["g"],
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Cursor: metabase.IterateCursor{Key: "a", Version: 0},
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
objects["a"],
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
objects["c"],
|
||||
objects["c/"],
|
||||
objects["c//"],
|
||||
objects["c/1"],
|
||||
objects["g"],
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Cursor: metabase.IterateCursor{Key: "a", Version: 1},
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
objects["c"],
|
||||
objects["c/"],
|
||||
objects["c//"],
|
||||
objects["c/1"],
|
||||
objects["g"],
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Cursor: metabase.IterateCursor{Key: "a", Version: 10},
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
objects["c"],
|
||||
objects["c/"],
|
||||
objects["c//"],
|
||||
objects["c/1"],
|
||||
objects["g"],
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Cursor: metabase.IterateCursor{Key: "b", Version: 0},
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
objects["c"],
|
||||
objects["c/"],
|
||||
objects["c//"],
|
||||
objects["c/1"],
|
||||
objects["g"],
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Prefix: "b/",
|
||||
},
|
||||
Result: withoutPrefix("b/",
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
),
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Prefix: "b/",
|
||||
Cursor: metabase.IterateCursor{Key: "a"},
|
||||
},
|
||||
Result: withoutPrefix("b/",
|
||||
objects["b/1"],
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
),
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Prefix: "b/",
|
||||
Cursor: metabase.IterateCursor{Key: "b/2", Version: -3},
|
||||
},
|
||||
Result: withoutPrefix("b/",
|
||||
objects["b/2"],
|
||||
objects["b/3"],
|
||||
),
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.IterateObjects{
|
||||
Opts: metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
|
||||
Prefix: "b/",
|
||||
Cursor: metabase.IterateCursor{Key: "c/"},
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("boundaries", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
projectID, bucketName := uuid.UUID{1}, "bucky"
|
||||
|
||||
queries := []metabase.ObjectKey{""}
|
||||
for a := 0; a <= 0xFF; a++ {
|
||||
if 4 < a && a < 251 {
|
||||
continue
|
||||
}
|
||||
queries = append(queries, metabase.ObjectKey([]byte{byte(a)}))
|
||||
for b := 0; b <= 0xFF; b++ {
|
||||
if 4 < b && b < 251 {
|
||||
continue
|
||||
}
|
||||
queries = append(queries, metabase.ObjectKey([]byte{byte(a), byte(b)}))
|
||||
}
|
||||
}
|
||||
|
||||
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries[1:])
|
||||
|
||||
for _, cursor := range queries {
|
||||
for _, prefix := range queries {
|
||||
var collector metabasetest.IterateCollector
|
||||
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: cursor,
|
||||
Version: -1,
|
||||
},
|
||||
Prefix: prefix,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("verify-iterator-boundary", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
projectID, bucketName := uuid.UUID{1}, "bucky"
|
||||
queries := []metabase.ObjectKey{"\x00\xFF"}
|
||||
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries)
|
||||
var collector metabasetest.IterateCollector
|
||||
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey([]byte{}),
|
||||
Version: -1,
|
||||
},
|
||||
Prefix: metabase.ObjectKey([]byte{1}),
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("verify-cursor-continuation", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
projectID, bucketName := uuid.UUID{1}, "bucky"
|
||||
|
||||
createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
|
||||
"1",
|
||||
"a/a",
|
||||
"a/0",
|
||||
})
|
||||
|
||||
var collector metabasetest.IterateCollector
|
||||
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Prefix: metabase.ObjectKey("a/"),
|
||||
BatchSize: 1,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
t.Run("invalid arguments", func(t *testing.T) {
|
||||
|
@ -54,37 +54,6 @@ type StreamIDCursor struct {
|
||||
StreamID uuid.UUID
|
||||
}
|
||||
|
||||
// IterateObjects contains arguments necessary for listing objects in a bucket.
|
||||
type IterateObjects struct {
|
||||
ProjectID uuid.UUID
|
||||
BucketName string
|
||||
BatchSize int
|
||||
Prefix ObjectKey
|
||||
Cursor IterateCursor
|
||||
}
|
||||
|
||||
// Verify verifies get object request fields.
|
||||
func (opts *IterateObjects) Verify() error {
|
||||
switch {
|
||||
case opts.ProjectID.IsZero():
|
||||
return ErrInvalidRequest.New("ProjectID missing")
|
||||
case opts.BucketName == "":
|
||||
return ErrInvalidRequest.New("BucketName missing")
|
||||
case opts.BatchSize < 0:
|
||||
return ErrInvalidRequest.New("BatchSize is negative")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateObjectsAllVersions iterates through all versions of all objects.
|
||||
func (db *DB) IterateObjectsAllVersions(ctx context.Context, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if err = opts.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
return iterateAllVersions(ctx, db, opts, fn)
|
||||
}
|
||||
|
||||
// IteratePendingObjectsByKey contains arguments necessary for listing pending objects by ObjectKey.
|
||||
type IteratePendingObjectsByKey struct {
|
||||
ObjectLocation
|
||||
|
@ -480,30 +480,6 @@ func (coll *LoopIterateCollector) Add(ctx context.Context, it metabase.LoopObjec
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateObjects is for testing metabase.IterateObjects.
|
||||
type IterateObjects struct {
|
||||
Opts metabase.IterateObjects
|
||||
|
||||
Result []metabase.ObjectEntry
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step IterateObjects) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
var collector IterateCollector
|
||||
|
||||
err := db.IterateObjectsAllVersions(ctx, step.Opts, collector.Add)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
result := []metabase.ObjectEntry(collector)
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return result[i].ObjectKey < result[j].ObjectKey
|
||||
})
|
||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// IteratePendingObjectsByKey is for testing metabase.IteratePendingObjectsByKey.
|
||||
type IteratePendingObjectsByKey struct {
|
||||
Opts metabase.IteratePendingObjectsByKey
|
||||
|
Loading…
Reference in New Issue
Block a user