satellite/metabase: adjust ListObjects
Change-Id: Id435388cd1447cacccff7897c4b0d1a58cd67591
This commit is contained in:
parent
c79629e4da
commit
7de1178836
@ -264,6 +264,23 @@ type ObjectStream struct {
|
||||
StreamID uuid.UUID
|
||||
}
|
||||
|
||||
// Less implements sorting on object streams.
|
||||
func (obj ObjectStream) Less(b ObjectStream) bool {
|
||||
if obj.ProjectID != b.ProjectID {
|
||||
return obj.ProjectID.Less(b.ProjectID)
|
||||
}
|
||||
if obj.BucketName != b.BucketName {
|
||||
return obj.BucketName < b.BucketName
|
||||
}
|
||||
if obj.ObjectKey != b.ObjectKey {
|
||||
return obj.ObjectKey < b.ObjectKey
|
||||
}
|
||||
if obj.Version != b.Version {
|
||||
return obj.Version > b.Version
|
||||
}
|
||||
return obj.StreamID.Less(b.StreamID)
|
||||
}
|
||||
|
||||
// Verify object stream fields.
|
||||
func (obj *ObjectStream) Verify() error {
|
||||
switch {
|
||||
|
@ -24,7 +24,7 @@ type ListObjects struct {
|
||||
Limit int
|
||||
Prefix ObjectKey
|
||||
Cursor ListObjectsCursor
|
||||
Status ObjectStatus
|
||||
Pending bool
|
||||
IncludeCustomMetadata bool
|
||||
IncludeSystemMetadata bool
|
||||
}
|
||||
@ -38,8 +38,6 @@ func (opts *ListObjects) Verify() error {
|
||||
return ErrInvalidRequest.New("BucketName missing")
|
||||
case opts.Limit < 0:
|
||||
return ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
|
||||
case !(opts.Status == Pending || opts.Status == CommittedUnversioned):
|
||||
return ErrInvalidRequest.New("Status is invalid")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -62,9 +60,10 @@ func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObj
|
||||
|
||||
var entries []ObjectEntry
|
||||
err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(),
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.startKey(), opts.Cursor.Version,
|
||||
opts.stopKey(), opts.Status,
|
||||
opts.Limit+1, len(opts.Prefix)+1))(func(rows tagsql.Rows) error {
|
||||
opts.ProjectID, []byte(opts.BucketName),
|
||||
opts.startKey(), opts.Cursor.Version, opts.stopKey(),
|
||||
opts.Limit+1, len(opts.Prefix)+1),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
entries, err = scanListObjectsResult(rows, opts)
|
||||
return err
|
||||
})
|
||||
@ -87,16 +86,64 @@ func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObj
|
||||
}
|
||||
|
||||
func (opts *ListObjects) getSQLQuery() string {
|
||||
return `
|
||||
SELECT ` + opts.selectedFields() + `
|
||||
FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
||||
AND ` + opts.stopCondition() + `
|
||||
AND status = $6
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY ` + opts.orderBy() + `
|
||||
LIMIT $7
|
||||
var indexFields string
|
||||
if opts.Recursive {
|
||||
indexFields = `
|
||||
substring(object_key from $7), FALSE as is_prefix`
|
||||
} else {
|
||||
indexFields = `
|
||||
DISTINCT ON (entry_key)
|
||||
CASE
|
||||
WHEN position('/' IN substring(object_key from $7)) <> 0
|
||||
THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1))
|
||||
ELSE substring(object_key from $7)
|
||||
END
|
||||
AS entry_key,
|
||||
position('/' IN substring(object_key from $7)) <> 0 AS is_prefix`
|
||||
}
|
||||
|
||||
if opts.Pending {
|
||||
return `SELECT ` + indexFields + opts.selectedFields() + `
|
||||
FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
||||
AND ` + opts.stopCondition() + `
|
||||
AND status = ` + statusPending + `
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY ` + opts.orderBy() + `
|
||||
LIMIT $6
|
||||
`
|
||||
}
|
||||
|
||||
// TODO(ver): using subquery the following subquery looks nicer, however CRDB has a bug related to it.
|
||||
//
|
||||
// SELECT MAX(sub.version)
|
||||
// FROM objects sub
|
||||
// WHERE
|
||||
// (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
|
||||
// AND status <> ` + statusPending + `
|
||||
// AND (expires_at IS NULL OR expires_at > now())
|
||||
|
||||
// query committed objects where the latest is not a delete marker
|
||||
return `SELECT ` + indexFields + opts.selectedFields() + `
|
||||
FROM objects main
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
||||
AND ` + opts.stopCondition() + `
|
||||
AND status IN ` + statusesCommitted + `
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
AND version = (
|
||||
SELECT sub.version
|
||||
FROM objects sub
|
||||
WHERE
|
||||
(sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
|
||||
AND status <> ` + statusPending + `
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY version DESC
|
||||
LIMIT 1
|
||||
)
|
||||
ORDER BY ` + opts.orderBy() + `
|
||||
LIMIT $6
|
||||
`
|
||||
}
|
||||
|
||||
@ -116,37 +163,20 @@ func (opts *ListObjects) stopCondition() string {
|
||||
|
||||
func (opts *ListObjects) orderBy() string {
|
||||
if !opts.Recursive {
|
||||
return "entry_key ASC"
|
||||
return "entry_key ASC, version DESC"
|
||||
}
|
||||
|
||||
return "(object_key, version) ASC"
|
||||
return "object_key ASC, version DESC"
|
||||
}
|
||||
|
||||
func (opts ListObjects) selectedFields() (selectedFields string) {
|
||||
|
||||
if opts.Recursive {
|
||||
selectedFields = `
|
||||
substring(object_key from $8), FALSE as is_prefix`
|
||||
} else {
|
||||
selectedFields = `
|
||||
DISTINCT ON (entry_key)
|
||||
CASE
|
||||
WHEN position('/' IN substring(object_key from $8)) <> 0
|
||||
THEN substring(substring(object_key from $8) from 0 for (position('/' IN substring(object_key from $8)) +1))
|
||||
ELSE substring(object_key from $8)
|
||||
END
|
||||
AS entry_key,
|
||||
position('/' IN substring(object_key from $8)) <> 0 AS is_prefix`
|
||||
}
|
||||
|
||||
selectedFields += `
|
||||
,stream_id
|
||||
,version
|
||||
,status
|
||||
,encryption`
|
||||
|
||||
if opts.IncludeSystemMetadata {
|
||||
selectedFields += `
|
||||
,status
|
||||
,created_at
|
||||
,expires_at
|
||||
,segment_count
|
||||
@ -205,12 +235,12 @@ func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []Object
|
||||
&item.IsPrefix,
|
||||
&item.StreamID,
|
||||
&item.Version,
|
||||
&item.Status,
|
||||
encryptionParameters{&item.Encryption},
|
||||
}
|
||||
|
||||
if opts.IncludeSystemMetadata {
|
||||
fields = append(fields,
|
||||
&item.Status,
|
||||
&item.CreatedAt,
|
||||
&item.ExpiresAt,
|
||||
&item.SegmentCount,
|
||||
@ -236,7 +266,12 @@ func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []Object
|
||||
item = ObjectEntry{
|
||||
IsPrefix: true,
|
||||
ObjectKey: item.ObjectKey,
|
||||
Status: opts.Status,
|
||||
}
|
||||
// TODO(ver): should we use `0` for prefixes instead?
|
||||
if opts.Pending {
|
||||
item.Status = Pending
|
||||
} else {
|
||||
item.Status = CommittedUnversioned
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@ -63,22 +64,6 @@ func TestListObjects(t *testing.T) {
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Status is invalid", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.ListObjects{
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
Limit: 3,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "Status is invalid",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("no objects", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
@ -86,7 +71,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{},
|
||||
}.Check(ctx, t, db)
|
||||
@ -116,7 +101,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
Recursive: false,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
Limit: limit,
|
||||
@ -143,7 +128,7 @@ func TestListObjects(t *testing.T) {
|
||||
BucketName: "mybucket",
|
||||
Recursive: true,
|
||||
Limit: limit,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -172,7 +157,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "bucket-a",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -202,7 +187,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "mybucket",
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -236,7 +221,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -259,7 +244,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -283,7 +268,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -307,7 +292,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -326,7 +311,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -346,7 +331,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -365,7 +350,7 @@ func TestListObjects(t *testing.T) {
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Recursive: true,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -396,7 +381,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -414,7 +399,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -433,7 +418,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -452,7 +437,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -470,7 +455,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -489,7 +474,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -507,7 +492,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -521,7 +506,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -540,7 +525,7 @@ func TestListObjects(t *testing.T) {
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
|
||||
@ -579,7 +564,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -600,7 +585,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
@ -621,7 +606,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
@ -651,7 +636,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
@ -672,7 +657,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
@ -692,7 +677,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
@ -738,7 +723,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08"),
|
||||
Version: objects["2017/05/08"].Version,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -762,7 +747,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -784,7 +769,7 @@ func TestListObjectsSkipCursor(t *testing.T) {
|
||||
Key: metabase.ObjectKey("2017/05/08/a/x"),
|
||||
Version: 1,
|
||||
},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
@ -829,7 +814,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
result, err := db.ListObjects(ctx, metabase.ListObjects{
|
||||
ProjectID: baseObj.ProjectID,
|
||||
BucketName: baseObj.BucketName,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -838,7 +823,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
ProjectID: baseObj.ProjectID,
|
||||
BucketName: baseObj.BucketName,
|
||||
Cursor: metabase.ListObjectsCursor{Key: result.Objects[len(result.Objects)-1].ObjectKey},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -852,7 +837,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
ProjectID: baseObj.ProjectID,
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "foo/",
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -863,7 +848,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "foo/",
|
||||
Cursor: metabase.ListObjectsCursor{Key: cursorKey},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -877,7 +862,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
ProjectID: baseObj.ProjectID,
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "boo/",
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -888,7 +873,7 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
BucketName: baseObj.BucketName,
|
||||
Prefix: "boo/",
|
||||
Cursor: metabase.ListObjectsCursor{Key: cursorKey},
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Pending: false,
|
||||
Limit: batchsize,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
@ -898,3 +883,131 @@ func BenchmarkNonRecursiveObjectsListing(b *testing.B) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestListObjectsVersioned(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
t.Run("2 objects, each with 2 versions", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
a0 := metabasetest.RandObjectStream()
|
||||
a0.Version = 1000
|
||||
b0 := metabasetest.RandObjectStream()
|
||||
b0.ProjectID = a0.ProjectID
|
||||
b0.BucketName = a0.BucketName
|
||||
b0.Version = 1000
|
||||
|
||||
if a0.ObjectKey > b0.ObjectKey {
|
||||
b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey
|
||||
}
|
||||
|
||||
a1 := a0
|
||||
a1.Version = 1001
|
||||
b1 := a0
|
||||
b1.Version = 500
|
||||
|
||||
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
||||
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
||||
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
||||
objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0)
|
||||
|
||||
metabasetest.ListObjects{
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: a0.ProjectID,
|
||||
BucketName: a0.BucketName,
|
||||
Recursive: true,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
Objects: []metabase.ObjectEntry{
|
||||
objectEntryFromRaw(metabase.RawObject(objA1)),
|
||||
objectEntryFromRaw(metabase.RawObject(objB0)),
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(objA0),
|
||||
metabase.RawObject(objA1),
|
||||
metabase.RawObject(objB0),
|
||||
metabase.RawObject(objB1),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("2 objects, each with two versions and one with delete_marker", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
a0 := metabasetest.RandObjectStream()
|
||||
a0.Version = 1000
|
||||
b0 := metabasetest.RandObjectStream()
|
||||
b0.ProjectID = a0.ProjectID
|
||||
b0.BucketName = a0.BucketName
|
||||
b0.Version = 1000
|
||||
|
||||
if a0.ObjectKey > b0.ObjectKey {
|
||||
b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey
|
||||
}
|
||||
|
||||
a1 := a0
|
||||
a1.Version = 1001
|
||||
b1 := a0
|
||||
b1.Version = 500
|
||||
|
||||
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
||||
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
||||
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
||||
objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0)
|
||||
|
||||
deletionResult := metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: objA0.Location(),
|
||||
Versioned: true,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: objA0.ProjectID,
|
||||
BucketName: objA0.BucketName,
|
||||
ObjectKey: objA0.ObjectKey,
|
||||
Version: 1002,
|
||||
},
|
||||
Status: metabase.DeleteMarkerVersioned,
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.ListObjects{
|
||||
Opts: metabase.ListObjects{
|
||||
ProjectID: a0.ProjectID,
|
||||
BucketName: a0.BucketName,
|
||||
Recursive: true,
|
||||
Pending: false,
|
||||
IncludeCustomMetadata: true,
|
||||
IncludeSystemMetadata: true,
|
||||
},
|
||||
Result: metabase.ListObjectsResult{
|
||||
Objects: []metabase.ObjectEntry{
|
||||
objectEntryFromRaw(metabase.RawObject(objB0)),
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(deletionResult.Objects[0]),
|
||||
metabase.RawObject(objA0),
|
||||
metabase.RawObject(objA1),
|
||||
metabase.RawObject(objB0),
|
||||
metabase.RawObject(objB1),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
// TODO(ver): more exhaustive tests (committed/deletemarker, unversioned/versioned)
|
||||
// TODO(ver): test with non-recursive listing
|
||||
})
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ func sortBucketTallies(tallies []metabase.BucketTally) {
|
||||
|
||||
func sortRawObjects(objects []metabase.RawObject) {
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].StreamID.Less(objects[j].StreamID)
|
||||
return objects[i].ObjectStream.Less(objects[j].ObjectStream)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -793,7 +793,7 @@ type DeleteObjectLastCommitted struct {
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) metabase.DeleteObjectResult {
|
||||
result, err := db.DeleteObjectLastCommitted(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
@ -802,6 +802,8 @@ func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.
|
||||
|
||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
|
||||
require.Zero(t, diff)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// CollectBucketTallies is for testing metabase.CollectBucketTallies.
|
||||
|
@ -933,7 +933,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
Limit: limit,
|
||||
Status: status,
|
||||
Pending: status == metabase.Pending,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user