satellite: omit system metadata fields if not requested

To reduce database load, add option to omit as many object properties
as possible when listing objects.

Change-Id: I817633801b00629a4042d1d1bd2389ee581953de
This commit is contained in:
Erik van Velzen 2021-09-28 14:36:10 +02:00
parent fb604be460
commit 91158fd3bf
7 changed files with 358 additions and 241 deletions

View File

@ -331,8 +331,9 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
Cursor: metabase.IterateCursor{
Key: object.ObjectKey,
},
Status: metabase.Committed,
IncludeMetadata: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
var item metabase.ObjectEntry
for it.Next(ctx, &item) {

View File

@ -476,11 +476,12 @@ func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID
err = db.IterateObjectsAllVersionsWithStatus(ctx,
IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: status,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: status,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, func(ctx context.Context, it ObjectsIterator) error {
entry := ObjectEntry{}
for it.Next(ctx, &entry) {

View File

@ -18,14 +18,15 @@ import (
type objectsIterator struct {
db *DB
projectID uuid.UUID
bucketName []byte
status ObjectStatus
prefix ObjectKey
prefixLimit ObjectKey
batchSize int
recursive bool
includeMetadata bool
projectID uuid.UUID
bucketName []byte
status ObjectStatus
prefix ObjectKey
prefixLimit ObjectKey
batchSize int
recursive bool
includeCustomMetadata bool
includeSystemMetadata bool
curIndex int
curRows tagsql.Rows
@ -51,13 +52,14 @@ func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn fun
it := &objectsIterator{
db: db,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: true,
includeMetadata: true,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: true,
includeCustomMetadata: true,
includeSystemMetadata: true,
curIndex: 0,
cursor: firstIterateCursor(true, opts.Cursor, opts.Prefix),
@ -80,14 +82,15 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
it := &objectsIterator{
db: db,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
status: opts.Status,
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: opts.Recursive,
includeMetadata: opts.IncludeMetadata,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
status: opts.Status,
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: opts.Recursive,
includeCustomMetadata: opts.IncludeCustomMetadata,
includeSystemMetadata: opts.IncludeSystemMetadata,
curIndex: 0,
cursor: firstIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix),
@ -117,13 +120,14 @@ func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePending
it := &objectsIterator{
db: db,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
prefix: "",
prefixLimit: "",
batchSize: opts.BatchSize,
recursive: true,
includeMetadata: true,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
prefix: "",
prefixLimit: "",
batchSize: opts.BatchSize,
recursive: true,
includeCustomMetadata: true,
includeSystemMetadata: true,
curIndex: 0,
cursor: iterateCursor{
@ -261,9 +265,9 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
@ -283,9 +287,9 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
@ -305,9 +309,29 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
metadataQuerySelectFields := ""
if it.includeMetadata {
metadataQuerySelectFields = "encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,"
// minimum needed for cursor
querySelectFields := `
object_key
,stream_id
,version`
if it.includeSystemMetadata {
querySelectFields += `
,status
,created_at
,expires_at
,segment_count
,total_plain_size
,total_encrypted_size
,fixed_segment_size
,encryption`
}
if it.includeCustomMetadata {
querySelectFields += `
,encrypted_metadata_nonce
,encrypted_metadata
,encrypted_metadata_encrypted_key`
}
cursorCompare := ">"
@ -318,12 +342,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
if it.prefixLimit == "" {
return it.db.db.QueryContext(ctx, `
SELECT
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
`+metadataQuerySelectFields+`
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
@ -343,12 +362,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
// works with CRDB.
return it.db.db.QueryContext(ctx, `
SELECT
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
`+metadataQuerySelectFields+`
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
@ -381,9 +395,9 @@ func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
@ -403,25 +417,35 @@ func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql
func (it *objectsIterator) scanItem(item *ObjectEntry) (err error) {
item.IsPrefix = false
if it.includeMetadata {
err = it.curRows.Scan(
&item.ObjectKey, &item.StreamID, &item.Version, &item.Status,
&item.CreatedAt, &item.ExpiresAt,
fields := []interface{}{
&item.ObjectKey,
&item.StreamID,
&item.Version,
}
if it.includeSystemMetadata {
fields = append(fields,
&item.Status,
&item.CreatedAt,
&item.ExpiresAt,
&item.SegmentCount,
&item.EncryptedMetadataNonce, &item.EncryptedMetadata, &item.EncryptedMetadataEncryptedKey,
&item.TotalPlainSize, &item.TotalEncryptedSize, &item.FixedSegmentSize,
encryptionParameters{&item.Encryption},
)
} else {
err = it.curRows.Scan(
&item.ObjectKey, &item.StreamID, &item.Version, &item.Status,
&item.CreatedAt, &item.ExpiresAt,
&item.SegmentCount,
&item.TotalPlainSize, &item.TotalEncryptedSize, &item.FixedSegmentSize,
&item.TotalPlainSize,
&item.TotalEncryptedSize,
&item.FixedSegmentSize,
encryptionParameters{&item.Encryption},
)
}
if it.includeCustomMetadata {
fields = append(fields,
&item.EncryptedMetadataNonce,
&item.EncryptedMetadata,
&item.EncryptedMetadataEncryptedKey,
)
}
err = it.curRows.Scan(fields...)
if err != nil {
return err
}

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
@ -576,11 +577,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{{
ObjectKey: committed.ObjectKey,
@ -597,11 +599,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Pending,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Pending,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{{
ObjectKey: pending.ObjectKey,
@ -625,12 +628,13 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: expected,
}.Check(ctx, t, db)
@ -648,12 +652,13 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: limit,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: expected,
}.Check(ctx, t, db)
@ -675,11 +680,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1},
BucketName: "bucket-a",
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: uuid.UUID{1},
BucketName: "bucket-a",
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: expected,
}.Check(ctx, t, db)
@ -703,11 +709,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: expected,
}.Check(ctx, t, db)
@ -735,11 +742,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
objects["a"],
@ -756,11 +764,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Cursor: metabase.IterateCursor{Key: "a", Version: 10},
},
@ -778,11 +787,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Cursor: metabase.IterateCursor{Key: "b", Version: 0},
},
@ -800,11 +810,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
},
@ -817,11 +828,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "a"},
@ -835,11 +847,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "b/2", Version: -3},
@ -852,11 +865,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "c/"},
@ -883,10 +897,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
objects["a"],
@ -899,10 +914,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Cursor: metabase.IterateCursor{Key: "a", Version: 10},
},
@ -916,10 +932,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Cursor: metabase.IterateCursor{Key: "b", Version: 0},
},
@ -933,10 +950,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
},
@ -949,10 +967,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "a"},
@ -966,10 +985,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "b/2", Version: -3},
@ -982,10 +1002,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "c/"},
@ -995,10 +1016,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "c/",
Cursor: metabase.IterateCursor{Key: "c/"},
@ -1012,10 +1034,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
Prefix: "c//",
},
@ -1055,8 +1078,10 @@ func TestIterateObjectsWithStatus(t *testing.T) {
Key: cursor,
Version: -1,
},
Prefix: prefix,
Status: metabase.Committed,
Prefix: prefix,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, collector.Add)
require.NoError(t, err)
@ -1067,9 +1092,11 @@ func TestIterateObjectsWithStatus(t *testing.T) {
Key: cursor,
Version: -1,
},
Prefix: prefix,
Recursive: true,
Status: metabase.Committed,
Prefix: prefix,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, collector.Add)
require.NoError(t, err)
}
@ -1089,9 +1116,10 @@ func TestIterateObjectsWithStatus(t *testing.T) {
Key: metabase.ObjectKey([]byte{}),
Version: -1,
},
Prefix: metabase.ObjectKey([]byte{1}),
Status: metabase.Committed,
IncludeMetadata: true,
Prefix: metabase.ObjectKey([]byte{1}),
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, collector.Add)
require.NoError(t, err)
})
@ -1108,12 +1136,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
var collector metabasetest.IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Prefix: metabase.ObjectKey("a/"),
BatchSize: 1,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: projectID,
BucketName: bucketName,
Prefix: metabase.ObjectKey("a/"),
BatchSize: 1,
Status: metabase.Committed,
IncludeCustomMetadata: true,
}, collector.Add)
require.NoError(t, err)
})
@ -1134,11 +1162,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
var collector metabasetest.IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: obj1.ProjectID,
BucketName: obj1.BucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: true,
ProjectID: obj1.ProjectID,
BucketName: obj1.BucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, collector.Add)
require.NoError(t, err)
@ -1150,7 +1179,7 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}
})
t.Run("exclude metadata", func(t *testing.T) {
t.Run("exclude custom metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj1 := metabasetest.RandObjectStream()
@ -1166,11 +1195,12 @@ func TestIterateObjectsWithStatus(t *testing.T) {
var collector metabasetest.IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: obj1.ProjectID,
BucketName: obj1.BucketName,
Recursive: true,
Status: metabase.Committed,
IncludeMetadata: false,
ProjectID: obj1.ProjectID,
BucketName: obj1.BucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: false,
IncludeSystemMetadata: true,
}, collector.Add)
require.NoError(t, err)
@ -1181,6 +1211,48 @@ func TestIterateObjectsWithStatus(t *testing.T) {
require.Nil(t, entry.EncryptedMetadataEncryptedKey)
}
})
t.Run("exclude system metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj1 := metabasetest.RandObjectStream()
metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: obj1,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: []byte{3},
EncryptedMetadataEncryptedKey: []byte{4},
EncryptedMetadataNonce: []byte{5},
},
}.Run(ctx, t, db, obj1, 4)
var collector metabasetest.IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: obj1.ProjectID,
BucketName: obj1.BucketName,
Recursive: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: false,
}, collector.Add)
require.NoError(t, err)
for _, entry := range collector {
require.True(t, entry.CreatedAt.IsZero())
require.Nil(t, entry.ExpiresAt)
require.Zero(t, entry.Status)
require.Zero(t, entry.SegmentCount)
require.Zero(t, entry.TotalPlainSize)
require.Zero(t, entry.TotalEncryptedSize)
require.Zero(t, entry.FixedSegmentSize)
require.Equal(t, storj.EncryptionParameters{}, entry.Encryption)
require.NotNil(t, entry.EncryptedMetadataNonce)
require.NotNil(t, entry.EncryptedMetadata)
require.NotNil(t, entry.EncryptedMetadataEncryptedKey)
}
})
})
}
@ -1207,8 +1279,9 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("08/"),
Version: 1,
},
Status: metabase.Committed,
IncludeMetadata: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
@ -1226,7 +1299,8 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("08"),
Version: 1,
},
Status: metabase.Committed,
Status: metabase.Committed,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
@ -1245,7 +1319,8 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("08/a/x"),
Version: 1,
},
Status: metabase.Committed,
Status: metabase.Committed,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
@ -1273,7 +1348,8 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08"),
Version: 1,
},
Status: metabase.Committed,
Status: metabase.Committed,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
@ -1292,7 +1368,8 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08/"),
Version: 1,
},
Status: metabase.Committed,
Status: metabase.Committed,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
@ -1310,7 +1387,8 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08/a/x"),
Version: 1,
},
Status: metabase.Committed,
Status: metabase.Committed,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
@ -1355,8 +1433,9 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08"),
Version: 1,
},
Status: metabase.Committed,
IncludeMetadata: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
@ -1377,8 +1456,9 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08/"),
Version: 1,
},
Status: metabase.Committed,
IncludeMetadata: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
withoutPrefix1("2017/05/", objects["2017/05/08"+afterDelimiter]),
@ -1397,8 +1477,9 @@ func TestIterateObjectsSkipCursor(t *testing.T) {
Key: metabase.ObjectKey("2017/05/08/a/x"),
Version: 1,
},
Status: metabase.Committed,
IncludeMetadata: true,
Status: metabase.Committed,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
},
Result: []metabase.ObjectEntry{
withoutPrefix1("2017/05/", objects["2017/05/08"+afterDelimiter]),

View File

@ -94,14 +94,15 @@ type IteratePendingObjectsByKey struct {
// IterateObjectsWithStatus contains arguments necessary for listing objects in a bucket.
type IterateObjectsWithStatus struct {
ProjectID uuid.UUID
BucketName string
Recursive bool
BatchSize int
Prefix ObjectKey
Cursor IterateCursor
Status ObjectStatus
IncludeMetadata bool
ProjectID uuid.UUID
BucketName string
Recursive bool
BatchSize int
Prefix ObjectKey
Cursor IterateCursor
Status ObjectStatus
IncludeCustomMetadata bool
IncludeSystemMetadata bool
}
// IterateObjectsAllVersionsWithStatus iterates through all versions of all objects with specified status.

View File

@ -1230,9 +1230,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
cursor = string(prefix) + cursor
}
includeMetadata := true
includeCustomMetadata := true
includeSystemMetadata := true
if req.UseObjectIncludes {
includeMetadata = req.ObjectIncludes.Metadata
includeCustomMetadata = req.ObjectIncludes.Metadata
includeSystemMetadata = !req.ObjectIncludes.ExcludeSystemMetadata
}
resp = &pb.ObjectListResponse{}
@ -1246,14 +1248,15 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
Key: metabase.ObjectKey(cursor),
Version: 1, // TODO: set to a the version from the protobuf request when it supports this
},
Recursive: req.Recursive,
BatchSize: limit + 1,
Status: status,
IncludeMetadata: includeMetadata,
Recursive: req.Recursive,
BatchSize: limit + 1,
Status: status,
IncludeCustomMetadata: includeCustomMetadata,
IncludeSystemMetadata: includeSystemMetadata,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeMetadata)
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeCustomMetadata)
if err != nil {
return err
}

View File

@ -561,7 +561,8 @@ func TestListGetObjects(t *testing.T) {
expectedBucketName := "testbucket"
items, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte(expectedBucketName),
Bucket: []byte(expectedBucketName),
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Equal(t, len(files), len(items))
@ -714,7 +715,8 @@ func TestBeginCommit(t *testing.T) {
require.NoError(t, err)
objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte(bucket.Name),
Bucket: []byte(bucket.Name),
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Len(t, objects, 1)
@ -815,7 +817,8 @@ func TestInlineSegment(t *testing.T) {
require.NoError(t, err)
objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte(bucket.Name),
Bucket: []byte(bucket.Name),
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Len(t, objects, 1)
@ -1978,9 +1981,10 @@ func TestStableUploadID(t *testing.T) {
// List the root of the bucket recursively
listResp, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
Recursive: true,
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
Recursive: true,
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Len(t, listResp, 1)
@ -1989,9 +1993,10 @@ func TestStableUploadID(t *testing.T) {
// List with prefix non-recursively
listResp2, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Len(t, listResp2, 1)
@ -2000,10 +2005,11 @@ func TestStableUploadID(t *testing.T) {
// List with prefix recursively
listResp3, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
Recursive: true,
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
Recursive: true,
IncludeSystemMetadata: true,
})
require.NoError(t, err)
require.Len(t, listResp3, 1)