satellite/metainfo: set internalpb.StreamID Versioned field

To have the same UploadID returned from BeginUpload, ListObjects
and ListPendingObjectStreams we need to set StreamID.Versioned field
everywhere.

Change-Id: I1328d9c476767559b8feb7c5bcd5afb154f7cee3
This commit is contained in:
Michal Niewrzal 2023-12-04 09:43:19 +01:00 committed by Storj Robot
parent c5e44072a8
commit 45cfaa8743
2 changed files with 42 additions and 5 deletions

View File

@ -991,7 +991,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
} }
for _, entry := range result.Objects { for _, entry := range result.Objects {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled)
if err != nil { if err != nil {
return nil, endpoint.convertMetabaseErr(err) return nil, endpoint.convertMetabaseErr(err)
} }
@ -1016,7 +1016,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}, func(ctx context.Context, it metabase.ObjectsIterator) error { }, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{} entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) { for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled)
if err != nil { if err != nil {
return err return err
} }
@ -1059,7 +1059,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) bucket, err := endpoint.buckets.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil { if err != nil {
if buckets.ErrBucketNotFound.Has(err) { if buckets.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket) return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket)
@ -1102,7 +1102,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
options, func(ctx context.Context, it metabase.ObjectsIterator) error { options, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{} entry := metabase.ObjectEntry{}
for it.Next(ctx, &entry) { for it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled)
if err != nil { if err != nil {
return err return err
} }
@ -1505,7 +1505,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket []byte, func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket []byte,
entry metabase.ObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey, entry metabase.ObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey,
includeSystem, includeMetadata bool, placement storj.PlacementConstraint) (item *pb.ObjectListItem, err error) { includeSystem, includeMetadata bool, placement storj.PlacementConstraint, versioned bool) (item *pb.ObjectListItem, err error) {
item = &pb.ObjectListItem{ item = &pb.ObjectListItem{
EncryptedObjectKey: []byte(entry.ObjectKey), EncryptedObjectKey: []byte(entry.ObjectKey),
@ -1582,6 +1582,7 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket
BlockSize: int64(entry.Encryption.BlockSize), BlockSize: int64(entry.Encryption.BlockSize),
}, },
Placement: int32(placement), Placement: int32(placement),
Versioned: versioned,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -2903,5 +2903,41 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
checkListing(2, false) checkListing(2, false)
checkListing(3, true) checkListing(3, true)
}) })
t.Run("check UploadID for versioned bucket", func(t *testing.T) {
defer ctx.Check(deleteBucket(bucketName))
require.NoError(t, createBucket(bucketName))
require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID))
response, err := satelliteSys.API.Metainfo.Endpoint.BeginObject(ctx, &pb.BeginObjectRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objectKey),
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite_ENC_AESGCM,
},
})
require.NoError(t, err)
listResponse, err := satelliteSys.API.Metainfo.Endpoint.ListObjects(ctx, &pb.ListObjectsRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
Status: pb.Object_UPLOADING,
})
require.NoError(t, err)
require.Len(t, listResponse.Items, 1)
// StreamId is encoded into UploadID on libuplink side
// require.Equal(t, response.StreamId.Bytes(), listResponse.Items[0].StreamId.Bytes())
lposResponse, err := satelliteSys.API.Metainfo.Endpoint.ListPendingObjectStreams(ctx, &pb.ListPendingObjectStreamsRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
EncryptedObjectKey: response.EncryptedObjectKey,
})
require.NoError(t, err)
require.Len(t, lposResponse.Items, 1)
require.Equal(t, response.StreamId.Bytes(), lposResponse.Items[0].StreamId.Bytes())
})
}) })
} }