satellite/metainfo: ensure list prefix is prepended in satStreamID

When listing pending objects with prefix, the prefix should be prepended
to the EncryptedPath in satStreamID. Otherwise, listing multipart
uploads may display different UploadID than expected.

Change-Id: I27e9f9af9348783e053ad123121b6ddd051739e4
This commit is contained in:
Kaloyan Raev 2021-02-19 16:32:55 +02:00
parent 8b9da01817
commit e0f15130a2
2 changed files with 36 additions and 10 deletions

View File

@ -976,7 +976,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}, func(ctx context.Context, it metabase.ObjectsIterator) error { }, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{} entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) { for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix)
if err != nil { if err != nil {
return err return err
} }
@ -1076,7 +1076,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
}, func(ctx context.Context, it metabase.ObjectsIterator) error { }, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{} entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) { for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "")
if err != nil { if err != nil {
return err return err
} }
@ -2185,7 +2185,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
return result, nil return result, nil
} }
func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket []byte, entry metabase.ObjectEntry) (item *pb.ObjectListItem, err error) { func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket []byte, entry metabase.ObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey) (item *pb.ObjectListItem, err error) {
expires := time.Time{} expires := time.Time{}
if entry.ExpiresAt != nil { if entry.ExpiresAt != nil {
expires = *entry.ExpiresAt expires = *entry.ExpiresAt
@ -2243,7 +2243,7 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket
if entry.Status == metabase.Pending { if entry.Status == metabase.Pending {
satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{ satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: bucket, Bucket: bucket,
EncryptedPath: item.EncryptedPath, EncryptedPath: append([]byte(prefixToPrependInSatStreamID), item.EncryptedPath...),
Version: item.Version, Version: item.Version,
CreationDate: item.CreatedAt, CreationDate: item.CreatedAt,
ExpirationDate: item.ExpiresAt, ExpirationDate: item.ExpiresAt,

View File

@ -1841,7 +1841,7 @@ func TestStableUploadID(t *testing.T) {
beginResp, err := client.BeginObject(ctx, metainfo.BeginObjectParams{ beginResp, err := client.BeginObject(ctx, metainfo.BeginObjectParams{
Bucket: []byte("testbucket"), Bucket: []byte("testbucket"),
EncryptedPath: []byte("testobject"), EncryptedPath: []byte("a/b/testobject"),
EncryptionParameters: storj.EncryptionParameters{ EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM, CipherSuite: storj.EncAESGCM,
BlockSize: 256, BlockSize: 256,
@ -1849,22 +1849,48 @@ func TestStableUploadID(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// List the root of the bucket recursively
listResp, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{ listResp, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"), Bucket: []byte("testbucket"),
Status: int32(metabase.Pending), Status: int32(metabase.Pending),
Recursive: true,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Len(t, listResp, 1) require.Len(t, listResp, 1)
// check that BeginObject and ListObjects return the same StreamID. // check that BeginObject and ListObjects return the same StreamID.
assert.Equal(t, beginResp.StreamID, listResp[0].StreamID) assert.Equal(t, beginResp.StreamID, listResp[0].StreamID)
// List with prefix non-recursively
listResp2, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{ listResp2, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"), Bucket: []byte("testbucket"),
Status: int32(metabase.Pending), Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
}) })
require.NoError(t, err) require.NoError(t, err)
require.Len(t, listResp2, 1) require.Len(t, listResp2, 1)
// check that the two list results return the same StreamID. // check that the StreamID is still the same.
assert.Equal(t, listResp[0].StreamID, listResp2[0].StreamID) assert.Equal(t, listResp[0].StreamID, listResp2[0].StreamID)
// List with prefix recursively
listResp3, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
EncryptedPrefix: []byte("a/b/"),
Recursive: true,
})
require.NoError(t, err)
require.Len(t, listResp3, 1)
// check that the StreamID is still the same.
assert.Equal(t, listResp[0].StreamID, listResp3[0].StreamID)
// List the pending object directly
listResp4, err := client.ListPendingObjectStreams(ctx, metainfo.ListPendingObjectStreamsParams{
Bucket: []byte("testbucket"),
EncryptedPath: []byte("a/b/testobject"),
})
require.NoError(t, err)
require.Len(t, listResp4.Items, 1)
// check that the StreamID is still the same.
assert.Equal(t, listResp[0].StreamID, listResp4.Items[0].StreamID)
}) })
} }