satellite/metabase: adjust code for iteration
Change-Id: Id3d4efe228a6f2d3642a639ef66a30e178ca001a
This commit is contained in:
parent
988ebbaf8d
commit
97c98d72e4
@ -217,7 +217,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket",
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
var entry metabase.ObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
@ -241,7 +241,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket",
|
||||
Prefix: metabase.ObjectKey(prefixes[i]),
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
var entry metabase.ObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
@ -333,7 +333,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: object.ObjectKey,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
|
@ -336,10 +336,10 @@ func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, er
|
||||
func (db *DB) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, CommittedUnversioned)
|
||||
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, false)
|
||||
}
|
||||
|
||||
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) {
|
||||
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, pending bool) (objects []ObjectEntry, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = db.IterateObjectsAllVersionsWithStatus(ctx,
|
||||
@ -347,7 +347,7 @@ func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: status,
|
||||
Pending: pending,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
}, func(ctx context.Context, it ObjectsIterator) error {
|
||||
|
@ -20,7 +20,7 @@ type objectsIterator struct {
|
||||
|
||||
projectID uuid.UUID
|
||||
bucketName []byte
|
||||
status ObjectStatus
|
||||
pending bool
|
||||
prefix ObjectKey
|
||||
prefixLimit ObjectKey
|
||||
batchSize int
|
||||
@ -54,7 +54,7 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
|
||||
|
||||
projectID: opts.ProjectID,
|
||||
bucketName: []byte(opts.BucketName),
|
||||
status: opts.Status,
|
||||
pending: opts.Pending,
|
||||
prefix: opts.Prefix,
|
||||
prefixLimit: prefixLimit(opts.Prefix),
|
||||
batchSize: opts.BatchSize,
|
||||
@ -98,7 +98,7 @@ func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePending
|
||||
recursive: true,
|
||||
includeCustomMetadata: true,
|
||||
includeSystemMetadata: true,
|
||||
status: Pending,
|
||||
pending: true,
|
||||
|
||||
curIndex: 0,
|
||||
cursor: iterateCursor{
|
||||
@ -162,7 +162,12 @@ func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
|
||||
*item = ObjectEntry{
|
||||
IsPrefix: true,
|
||||
ObjectKey: item.ObjectKey[:p+1],
|
||||
Status: it.status,
|
||||
}
|
||||
// TODO(ver): should we return something else here?
|
||||
if it.pending {
|
||||
item.Status = Pending
|
||||
} else {
|
||||
item.Status = CommittedUnversioned
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,6 +236,11 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
cursorCompare = ">="
|
||||
}
|
||||
|
||||
statusFilter := `AND status <> ` + statusPending
|
||||
if it.pending {
|
||||
statusFilter = `AND status = ` + statusPending
|
||||
}
|
||||
|
||||
if it.prefixLimit == "" {
|
||||
querySelectFields := querySelectorFields("object_key", it)
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
@ -238,14 +248,13 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
`+querySelectFields+`
|
||||
FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name) < ($1, $7)
|
||||
AND status = $3
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
|
||||
AND (project_id, bucket_name) < ($1, $6)
|
||||
`+statusFilter+`
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $6
|
||||
LIMIT $5
|
||||
`, it.projectID, it.bucketName,
|
||||
it.status,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
it.batchSize,
|
||||
nextBucket(it.bucketName),
|
||||
@ -257,20 +266,19 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
fromSubstring = len(it.prefix) + 1
|
||||
}
|
||||
|
||||
querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $8)", it)
|
||||
querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $7)", it)
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
`+querySelectFields+`
|
||||
FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name, object_key) < ($1, $2, $6)
|
||||
AND status = $3
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
|
||||
AND (project_id, bucket_name, object_key) < ($1, $2, $5)
|
||||
`+statusFilter+`
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $7
|
||||
LIMIT $6
|
||||
`, it.projectID, it.bucketName,
|
||||
it.status,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
[]byte(it.prefixLimit),
|
||||
it.batchSize,
|
||||
@ -282,6 +290,7 @@ func querySelectorFields(objectKeyColumn string, it *objectsIterator) string {
|
||||
querySelectFields := objectKeyColumn + `
|
||||
,stream_id
|
||||
,version
|
||||
,status
|
||||
,encryption`
|
||||
|
||||
if it.includeSystemMetadata {
|
||||
@ -317,7 +326,7 @@ func doNextQueryPendingObjectsByKey(ctx context.Context, it *objectsIterator) (_
|
||||
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, encryption,
|
||||
object_key, stream_id, version, status, encryption,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
@ -339,12 +348,12 @@ func doNextQueryPendingObjectsByKey(ctx context.Context, it *objectsIterator) (_
|
||||
// scanItem scans doNextQuery results into ObjectEntry.
|
||||
func (it *objectsIterator) scanItem(item *ObjectEntry) (err error) {
|
||||
item.IsPrefix = false
|
||||
item.Status = it.status
|
||||
|
||||
fields := []interface{}{
|
||||
&item.ObjectKey,
|
||||
&item.StreamID,
|
||||
&item.Version,
|
||||
&item.Status,
|
||||
encryptionParameters{&item.Encryption},
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: uuid.UUID{},
|
||||
BucketName: "sj://mybucket",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ProjectID missing",
|
||||
@ -41,7 +41,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BucketName missing",
|
||||
@ -54,24 +54,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: "mybucket",
|
||||
BatchSize: -1,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BatchSize is negative",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
t.Run("Status is invalid", func(t *testing.T) {
|
||||
metabasetest.IterateObjectsWithStatus{
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "test",
|
||||
Recursive: true,
|
||||
Status: 255,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "Status 255 is not supported",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("empty bucket", func(t *testing.T) {
|
||||
@ -83,7 +71,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: "myemptybucket",
|
||||
BatchSize: 10,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
},
|
||||
Result: nil,
|
||||
}.Check(ctx, t, db)
|
||||
@ -135,7 +123,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -157,7 +145,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.Pending,
|
||||
Pending: true,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -187,7 +175,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: "mybucket",
|
||||
Recursive: true,
|
||||
BatchSize: limit,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -211,7 +199,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: "mybucket",
|
||||
Recursive: true,
|
||||
BatchSize: limit,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -238,7 +226,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "bucket-a",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -267,7 +255,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -300,7 +288,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -322,7 +310,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -345,7 +333,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -368,7 +356,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -386,7 +374,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -405,7 +393,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -423,7 +411,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -454,7 +442,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -471,7 +459,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -489,7 +477,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -507,7 +495,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -524,7 +512,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -542,7 +530,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -559,7 +547,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -573,7 +561,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -591,7 +579,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Opts: metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -635,7 +623,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Version: -1,
|
||||
},
|
||||
Prefix: prefix,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
@ -650,7 +638,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
},
|
||||
Prefix: prefix,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
@ -672,7 +660,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
Version: -1,
|
||||
},
|
||||
Prefix: metabase.ObjectKey([]byte{1}),
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
}, collector.Add)
|
||||
@ -695,7 +683,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: bucketName,
|
||||
Prefix: metabase.ObjectKey("a/"),
|
||||
BatchSize: 1,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
@ -721,7 +709,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: obj1.ProjectID,
|
||||
BucketName: obj1.BucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
}, collector.Add)
|
||||
@ -754,7 +742,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: obj1.ProjectID,
|
||||
BucketName: obj1.BucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: false,
|
||||
IncludeSystemMetadata: true,
|
||||
}, collector.Add)
|
||||
@ -788,7 +776,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: obj1.ProjectID,
|
||||
BucketName: obj1.BucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: false,
|
||||
}, collector.Add)
|
||||
@ -830,7 +818,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Prefix: metabase.ObjectKey("a/"),
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
BatchSize: 1,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
@ -891,7 +879,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: stream.ProjectID,
|
||||
BucketName: stream.BucketName,
|
||||
BatchSize: batchSize,
|
||||
Status: 3,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
}
|
||||
metabasetest.IterateObjectsWithStatus{
|
||||
@ -927,7 +915,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
BucketName: bucketName,
|
||||
Recursive: false,
|
||||
Prefix: "aaaa/",
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
BatchSize: 2,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -974,7 +962,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
BatchSize: 3,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -1010,7 +998,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -1030,7 +1018,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
@ -1050,7 +1038,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
@ -1079,7 +1067,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
@ -1099,7 +1087,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
@ -1118,7 +1106,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: []metabase.ObjectEntry{
|
||||
@ -1164,7 +1152,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08"),
|
||||
Version: objects["2017/05/08"].Version,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -1187,7 +1175,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -1208,7 +1196,7 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -1466,6 +1454,8 @@ func TestIteratePendingObjectsWithObjectKey(t *testing.T) {
|
||||
|
||||
metabasetest.Verify{Objects: objects}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
// TODO(ver): add tests for delete markers and versioned/unversioned
|
||||
})
|
||||
}
|
||||
|
||||
@ -1584,7 +1574,7 @@ func BenchmarkNonRecursiveListing(b *testing.B) {
|
||||
ProjectID: baseObj.ProjectID,
|
||||
BucketName: baseObj.BucketName,
|
||||
BatchSize: 5,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
}, func(ctx context.Context, oi metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for oi.Next(ctx, &entry) {
|
||||
@ -1602,7 +1592,7 @@ func BenchmarkNonRecursiveListing(b *testing.B) {
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "foo/",
|
||||
BatchSize: 5,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
}, func(ctx context.Context, oi metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for oi.Next(ctx, &entry) {
|
||||
@ -1620,7 +1610,7 @@ func BenchmarkNonRecursiveListing(b *testing.B) {
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "boo/",
|
||||
BatchSize: 5,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
}, func(ctx context.Context, oi metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for oi.Next(ctx, &entry) {
|
||||
|
@ -69,7 +69,7 @@ type IterateObjectsWithStatus struct {
|
||||
BatchSize int
|
||||
Prefix ObjectKey
|
||||
Cursor IterateCursor
|
||||
Status ObjectStatus
|
||||
Pending bool
|
||||
IncludeCustomMetadata bool
|
||||
IncludeSystemMetadata bool
|
||||
}
|
||||
@ -92,8 +92,6 @@ func (opts *IterateObjectsWithStatus) Verify() error {
|
||||
return ErrInvalidRequest.New("BucketName missing")
|
||||
case opts.BatchSize < 0:
|
||||
return ErrInvalidRequest.New("BatchSize is negative")
|
||||
case !(opts.Status == Pending || opts.Status == CommittedUnversioned):
|
||||
return ErrInvalidRequest.New("Status %v is not supported", opts.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1001,7 +1001,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
Status: metabase.Pending,
|
||||
Pending: true,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
@ -1063,7 +1063,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
Status: status,
|
||||
Pending: status == metabase.Pending,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
|
Loading…
Reference in New Issue
Block a user