From 45cfaa87436b8835787be62b3f86eebcb460315e Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 4 Dec 2023 09:43:19 +0100 Subject: [PATCH] satellite/metainfo: set internalpb.StreamID Versioned field To have the same UploadID returned from BeginUpload, ListObjects and ListPendingObjectStreams we need to set StreamID.Versioned field everywhere. Change-Id: I1328d9c476767559b8feb7c5bcd5afb154f7cee3 --- satellite/metainfo/endpoint_object.go | 11 ++++--- satellite/metainfo/endpoint_object_test.go | 36 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 5ddfdaf5a..7d8fc8093 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -991,7 +991,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } for _, entry := range result.Objects { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) if err != nil { return nil, endpoint.convertMetabaseErr(err) } @@ -1016,7 +1016,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq }, func(ctx context.Context, it metabase.ObjectsIterator) error { entry := metabase.ObjectEntry{} for len(resp.Items) < limit && it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) if err != nil { return err } @@ -1059,7 +1059,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } - placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) + bucket, err := endpoint.buckets.GetBucket(ctx, req.Bucket, keyInfo.ProjectID) if err != nil { if buckets.ErrBucketNotFound.Has(err) { return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket) @@ -1102,7 +1102,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. options, func(ctx context.Context, it metabase.ObjectsIterator) error { entry := metabase.ObjectEntry{} for it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) if err != nil { return err } @@ -1505,7 +1505,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket []byte, entry metabase.ObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey, - includeSystem, includeMetadata bool, placement storj.PlacementConstraint) (item *pb.ObjectListItem, err error) { + includeSystem, includeMetadata bool, placement storj.PlacementConstraint, versioned bool) (item *pb.ObjectListItem, err error) { item = &pb.ObjectListItem{ EncryptedObjectKey: []byte(entry.ObjectKey), @@ -1582,6 +1582,7 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket BlockSize: int64(entry.Encryption.BlockSize), }, Placement: int32(placement), + Versioned: versioned, }) if err != nil { return nil, err diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index cf500e4dd..b3e887ee5 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -2903,5 +2903,41 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) { checkListing(2, false) checkListing(3, true) }) + + t.Run("check UploadID for versioned bucket", func(t *testing.T) { + defer ctx.Check(deleteBucket(bucketName)) + + require.NoError(t, createBucket(bucketName)) + require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID)) + + response, err := satelliteSys.API.Metainfo.Endpoint.BeginObject(ctx, &pb.BeginObjectRequest{ + Header: &pb.RequestHeader{ApiKey: apiKey}, + Bucket: []byte(bucketName), + EncryptedObjectKey: []byte(objectKey), + EncryptionParameters: &pb.EncryptionParameters{ + CipherSuite: pb.CipherSuite_ENC_AESGCM, + }, + }) + require.NoError(t, err) + + listResponse, err := satelliteSys.API.Metainfo.Endpoint.ListObjects(ctx, &pb.ListObjectsRequest{ + Header: &pb.RequestHeader{ApiKey: apiKey}, + Bucket: []byte(bucketName), + Status: pb.Object_UPLOADING, + }) + require.NoError(t, err) + require.Len(t, listResponse.Items, 1) + // StreamId is encoded into UploadID on libuplink side + // require.Equal(t, response.StreamId.Bytes(), listResponse.Items[0].StreamId.Bytes()) + + lposResponse, err := satelliteSys.API.Metainfo.Endpoint.ListPendingObjectStreams(ctx, &pb.ListPendingObjectStreamsRequest{ + Header: &pb.RequestHeader{ApiKey: apiKey}, + Bucket: []byte(bucketName), + EncryptedObjectKey: response.EncryptedObjectKey, + }) + require.NoError(t, err) + require.Len(t, lposResponse.Items, 1) + require.Equal(t, response.StreamId.Bytes(), lposResponse.Items[0].StreamId.Bytes()) + }) }) }