From 2d4760fd096ab3d7e08a7b6211425c2a6c770dac Mon Sep 17 00:00:00 2001 From: Fadila Khadar Date: Fri, 28 Jan 2022 01:17:33 +0100 Subject: [PATCH] satellite/metainfo: BeginCopyObject and FinishCopyObject Metainfo endpoints to use server-side copy. Fixes https://github.com/storj/storj/issues/4475 Change-Id: Ided06aed9e6187d6d8f030e95dda019ba78fff95 --- satellite/metainfo/endpoint_object.go | 238 +++++++++++++++++++++ satellite/metainfo/endpoint_object_test.go | 96 +++++++++ 2 files changed, 334 insertions(+) diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 3d49a3b16..c2037e8ff 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -1654,6 +1654,244 @@ func (endpoint *Endpoint) FinishMoveObject(ctx context.Context, req *pb.ObjectFi return &pb.ObjectFinishMoveResponse{}, nil } +// Server side copy. + +// BeginCopyObject begins copying object to different key. +func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeginCopyRequest) (resp *pb.ObjectBeginCopyResponse, err error) { + defer mon.Task()(&ctx)(&err) + + err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName()) + if err != nil { + endpoint.log.Warn("unable to collect uplink version", zap.Error(err)) + } + + now := time.Now() + keyInfo, err := endpoint.validateAuthN(ctx, req.Header, + verifyPermission{ + action: macaroon.Action{ + Op: macaroon.ActionRead, + Bucket: req.Bucket, + EncryptedPath: req.EncryptedObjectKey, + Time: now, + }, + }, + verifyPermission{ + action: macaroon.Action{ + Op: macaroon.ActionWrite, + Bucket: req.NewBucket, + EncryptedPath: req.NewEncryptedObjectKey, + Time: now, + }, + }, + verifyPermission{ + action: macaroon.Action{ + Op: macaroon.ActionWrite, + Bucket: req.NewBucket, + EncryptedPath: req.NewEncryptedObjectKey, + Time: now, + }, + }, + ) + if err != nil { + return nil, err + } + + for _, bucket := range [][]byte{req.Bucket, req.NewBucket} { + err = endpoint.validateBucket(ctx, bucket) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + } + + // we are verifying existence of target bucket only because source bucket + // will be checked while quering source object + // TODO this needs to be optimized to avoid DB call on each request + newBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.NewBucket, keyInfo.ProjectID) + if err != nil { + if storj.ErrBucketNotFound.Has(err) { + return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.NewBucket) + } + endpoint.log.Error("unable to check bucket", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + // if source and target buckets are different, we need to check their geofencing configs + if !bytes.Equal(req.Bucket, req.NewBucket) { + oldBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) + if err != nil { + if storj.ErrBucketNotFound.Has(err) { + return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket) + } + endpoint.log.Error("unable to check bucket", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + if oldBucketPlacement != newBucketPlacement { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "copying object to bucket with different placement policy is not (yet) supported") + } + } + + result, err := endpoint.metabase.BeginCopyObject(ctx, metabase.BeginCopyObject{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), + }, + Version: metabase.DefaultVersion, + }) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } + + response, err := convertBeginCopyObjectResults(result) + if err != nil { + endpoint.log.Error("internal", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{ + Bucket: req.Bucket, + EncryptedObjectKey: req.EncryptedObjectKey, + Version: int32(metabase.DefaultVersion), + StreamId: result.StreamID[:], + EncryptionParameters: &pb.EncryptionParameters{ + CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite), + BlockSize: int64(result.EncryptionParameters.BlockSize), + }, + Placement: int32(newBucketPlacement), + }) + if err != nil { + endpoint.log.Error("internal", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + response.StreamId = satStreamID + return response, nil +} + +func convertBeginCopyObjectResults(result metabase.BeginCopyObjectResult) (*pb.ObjectBeginCopyResponse, error) { + keys := make([]*pb.EncryptedKeyAndNonce, len(result.EncryptedKeysNonces)) + for i, key := range result.EncryptedKeysNonces { + var nonce storj.Nonce + var err error + if len(key.EncryptedKeyNonce) != 0 { + nonce, err = storj.NonceFromBytes(key.EncryptedKeyNonce) + if err != nil { + return nil, err + } + } + + keys[i] = &pb.EncryptedKeyAndNonce{ + Position: &pb.SegmentPosition{ + PartNumber: int32(key.Position.Part), + Index: int32(key.Position.Index), + }, + EncryptedKey: key.EncryptedKey, + EncryptedKeyNonce: nonce, + } + } + + // TODO we need this becase of an uplink issue with how we are storing key and nonce + if result.EncryptedMetadataKey == nil { + streamMeta := &pb.StreamMeta{} + err := pb.Unmarshal(result.EncryptedMetadata, streamMeta) + if err != nil { + return nil, err + } + if streamMeta.LastSegmentMeta != nil { + result.EncryptedMetadataKey = streamMeta.LastSegmentMeta.EncryptedKey + result.EncryptedMetadataKeyNonce = streamMeta.LastSegmentMeta.KeyNonce + } + } + + var metadataNonce storj.Nonce + var err error + if len(result.EncryptedMetadataKeyNonce) != 0 { + metadataNonce, err = storj.NonceFromBytes(result.EncryptedMetadataKeyNonce) + if err != nil { + return nil, err + } + } + + return &pb.ObjectBeginCopyResponse{ + EncryptedMetadataKey: result.EncryptedMetadataKey, + EncryptedMetadataKeyNonce: metadataNonce, + EncryptionParameters: &pb.EncryptionParameters{ + CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite), + BlockSize: int64(result.EncryptionParameters.BlockSize), + }, + SegmentKeys: keys, + }, nil +} + +// FinishCopyObject accepts new encryption keys for object copy and updates the corresponding object ObjectKey and segments EncryptedKey. +func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFinishCopyRequest) (resp *pb.ObjectFinishCopyResponse, err error) { + defer mon.Task()(&ctx)(&err) + + err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName()) + if err != nil { + endpoint.log.Warn("unable to collect uplink version", zap.Error(err)) + } + + streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ + Op: macaroon.ActionWrite, + Time: time.Now(), + Bucket: req.NewBucket, + EncryptedPath: req.NewEncryptedMetadataKey, + }) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + } + + err = endpoint.validateBucket(ctx, req.NewBucket) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + exists, err := endpoint.buckets.HasBucket(ctx, req.NewBucket, keyInfo.ProjectID) + if err != nil { + endpoint.log.Error("unable to check bucket", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } else if !exists { + return nil, rpcstatus.Errorf(rpcstatus.NotFound, "target bucket not found: %s", req.NewBucket) + } + + streamUUID, err := uuid.FromBytes(streamID.StreamId) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + newStreamID, err := uuid.New() + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + err = endpoint.metabase.FinishCopyObject(ctx, metabase.FinishCopyObject{ + ObjectStream: metabase.ObjectStream{ + ProjectID: keyInfo.ProjectID, + BucketName: string(streamID.Bucket), + ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey), + Version: metabase.DefaultVersion, + StreamID: streamUUID, + }, + NewStreamID: newStreamID, + NewSegmentKeys: protobufkeysToMetabase(req.NewSegmentKeys), + NewBucket: string(req.NewBucket), + NewEncryptedObjectKey: req.NewEncryptedObjectKey, + NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce[:], + NewEncryptedMetadataKey: req.NewEncryptedMetadataKey, + }) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } + + return &pb.ObjectFinishCopyResponse{}, nil +} + // protobufkeysToMetabase converts []*pb.EncryptedKeyAndNonce to []metabase.EncryptedKeyAndNonce. func protobufkeysToMetabase(protoKeys []*pb.EncryptedKeyAndNonce) []metabase.EncryptedKeyAndNonce { keys := make([]metabase.EncryptedKeyAndNonce, len(protoKeys)) diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 83f7b90fd..843fb1de6 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -1235,3 +1235,99 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te }) }) } + +func TestEndpoint_CopyObject(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] + satelliteSys := planet.Satellites[0] + uplnk := planet.Uplinks[0] + + // upload a small inline object + err := uplnk.Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(1*memory.KiB)) + require.NoError(t, err) + objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, 1) + + getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Bucket: []byte("testbucket"), + EncryptedPath: []byte(objects[0].ObjectKey), + }) + require.NoError(t, err) + + testEncryptedMetadataNonce := testrand.Nonce() + // update the object metadata + beginResp, err := satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Bucket: getResp.Object.Bucket, + EncryptedObjectKey: getResp.Object.EncryptedPath, + NewBucket: []byte("testbucket"), + NewEncryptedObjectKey: []byte("newencryptedkey"), + }) + require.NoError(t, err) + assert.Len(t, beginResp.SegmentKeys, 1) + assert.Equal(t, beginResp.EncryptedMetadataKey, objects[0].EncryptedMetadataEncryptedKey) + assert.Equal(t, beginResp.EncryptedMetadataKeyNonce.Bytes(), objects[0].EncryptedMetadataNonce) + + segmentKeys := pb.EncryptedKeyAndNonce{ + Position: beginResp.SegmentKeys[0].Position, + EncryptedKeyNonce: testrand.Nonce(), + EncryptedKey: []byte("newencryptedkey"), + } + + _, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + StreamId: getResp.Object.StreamId, + NewBucket: []byte("testbucket"), + NewEncryptedObjectKey: []byte("newobjectkey"), + NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce, + NewEncryptedMetadataKey: []byte("encryptedmetadatakey"), + NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys}, + }) + require.NoError(t, err) + + objectsAfterCopy, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objectsAfterCopy, 2) + + getCopyResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Bucket: []byte("testbucket"), + EncryptedPath: []byte("newobjectkey"), + }) + require.NoError(t, err, objectsAfterCopy[1]) + require.NotEqual(t, getResp.Object.StreamId, getCopyResp.Object.StreamId) + require.NotZero(t, getCopyResp.Object.StreamId) + require.Equal(t, getResp.Object.InlineSize, getCopyResp.Object.InlineSize) + + // compare segments + originalSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + StreamId: getResp.Object.StreamId, + CursorPosition: segmentKeys.Position, + }) + require.NoError(t, err) + copiedSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + StreamId: getCopyResp.Object.StreamId, + CursorPosition: segmentKeys.Position, + }) + require.NoError(t, err) + require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData) + }) +}