satellite/metainfo: BeginCopyObject and FinishCopyObject
Metainfo endpoints to use server-side copy. Fixes https://github.com/storj/storj/issues/4475 Change-Id: Ided06aed9e6187d6d8f030e95dda019ba78fff95
This commit is contained in:
parent
fbe2680500
commit
2d4760fd09
@ -1654,6 +1654,244 @@ func (endpoint *Endpoint) FinishMoveObject(ctx context.Context, req *pb.ObjectFi
|
||||
return &pb.ObjectFinishMoveResponse{}, nil
|
||||
}
|
||||
|
||||
// Server side copy.
|
||||
|
||||
// BeginCopyObject begins copying object to different key.
|
||||
func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeginCopyRequest) (resp *pb.ObjectBeginCopyResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
||||
if err != nil {
|
||||
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
keyInfo, err := endpoint.validateAuthN(ctx, req.Header,
|
||||
verifyPermission{
|
||||
action: macaroon.Action{
|
||||
Op: macaroon.ActionRead,
|
||||
Bucket: req.Bucket,
|
||||
EncryptedPath: req.EncryptedObjectKey,
|
||||
Time: now,
|
||||
},
|
||||
},
|
||||
verifyPermission{
|
||||
action: macaroon.Action{
|
||||
Op: macaroon.ActionWrite,
|
||||
Bucket: req.NewBucket,
|
||||
EncryptedPath: req.NewEncryptedObjectKey,
|
||||
Time: now,
|
||||
},
|
||||
},
|
||||
verifyPermission{
|
||||
action: macaroon.Action{
|
||||
Op: macaroon.ActionWrite,
|
||||
Bucket: req.NewBucket,
|
||||
EncryptedPath: req.NewEncryptedObjectKey,
|
||||
Time: now,
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, bucket := range [][]byte{req.Bucket, req.NewBucket} {
|
||||
err = endpoint.validateBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// we are verifying existence of target bucket only because source bucket
|
||||
// will be checked while quering source object
|
||||
// TODO this needs to be optimized to avoid DB call on each request
|
||||
newBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.NewBucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
if storj.ErrBucketNotFound.Has(err) {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.NewBucket)
|
||||
}
|
||||
endpoint.log.Error("unable to check bucket", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
// if source and target buckets are different, we need to check their geofencing configs
|
||||
if !bytes.Equal(req.Bucket, req.NewBucket) {
|
||||
oldBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
if storj.ErrBucketNotFound.Has(err) {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket)
|
||||
}
|
||||
endpoint.log.Error("unable to check bucket", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if oldBucketPlacement != newBucketPlacement {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "copying object to bucket with different placement policy is not (yet) supported")
|
||||
}
|
||||
}
|
||||
|
||||
result, err := endpoint.metabase.BeginCopyObject(ctx, metabase.BeginCopyObject{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
|
||||
response, err := convertBeginCopyObjectResults(result)
|
||||
if err != nil {
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
|
||||
Bucket: req.Bucket,
|
||||
EncryptedObjectKey: req.EncryptedObjectKey,
|
||||
Version: int32(metabase.DefaultVersion),
|
||||
StreamId: result.StreamID[:],
|
||||
EncryptionParameters: &pb.EncryptionParameters{
|
||||
CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite),
|
||||
BlockSize: int64(result.EncryptionParameters.BlockSize),
|
||||
},
|
||||
Placement: int32(newBucketPlacement),
|
||||
})
|
||||
if err != nil {
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
response.StreamId = satStreamID
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func convertBeginCopyObjectResults(result metabase.BeginCopyObjectResult) (*pb.ObjectBeginCopyResponse, error) {
|
||||
keys := make([]*pb.EncryptedKeyAndNonce, len(result.EncryptedKeysNonces))
|
||||
for i, key := range result.EncryptedKeysNonces {
|
||||
var nonce storj.Nonce
|
||||
var err error
|
||||
if len(key.EncryptedKeyNonce) != 0 {
|
||||
nonce, err = storj.NonceFromBytes(key.EncryptedKeyNonce)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
keys[i] = &pb.EncryptedKeyAndNonce{
|
||||
Position: &pb.SegmentPosition{
|
||||
PartNumber: int32(key.Position.Part),
|
||||
Index: int32(key.Position.Index),
|
||||
},
|
||||
EncryptedKey: key.EncryptedKey,
|
||||
EncryptedKeyNonce: nonce,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO we need this becase of an uplink issue with how we are storing key and nonce
|
||||
if result.EncryptedMetadataKey == nil {
|
||||
streamMeta := &pb.StreamMeta{}
|
||||
err := pb.Unmarshal(result.EncryptedMetadata, streamMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if streamMeta.LastSegmentMeta != nil {
|
||||
result.EncryptedMetadataKey = streamMeta.LastSegmentMeta.EncryptedKey
|
||||
result.EncryptedMetadataKeyNonce = streamMeta.LastSegmentMeta.KeyNonce
|
||||
}
|
||||
}
|
||||
|
||||
var metadataNonce storj.Nonce
|
||||
var err error
|
||||
if len(result.EncryptedMetadataKeyNonce) != 0 {
|
||||
metadataNonce, err = storj.NonceFromBytes(result.EncryptedMetadataKeyNonce)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.ObjectBeginCopyResponse{
|
||||
EncryptedMetadataKey: result.EncryptedMetadataKey,
|
||||
EncryptedMetadataKeyNonce: metadataNonce,
|
||||
EncryptionParameters: &pb.EncryptionParameters{
|
||||
CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite),
|
||||
BlockSize: int64(result.EncryptionParameters.BlockSize),
|
||||
},
|
||||
SegmentKeys: keys,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// FinishCopyObject accepts new encryption keys for object copy and updates the corresponding object ObjectKey and segments EncryptedKey.
|
||||
func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFinishCopyRequest) (resp *pb.ObjectFinishCopyResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
||||
if err != nil {
|
||||
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
|
||||
}
|
||||
|
||||
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
||||
Op: macaroon.ActionWrite,
|
||||
Time: time.Now(),
|
||||
Bucket: req.NewBucket,
|
||||
EncryptedPath: req.NewEncryptedMetadataKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.validateBucket(ctx, req.NewBucket)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
exists, err := endpoint.buckets.HasBucket(ctx, req.NewBucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to check bucket", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
} else if !exists {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "target bucket not found: %s", req.NewBucket)
|
||||
}
|
||||
|
||||
streamUUID, err := uuid.FromBytes(streamID.StreamId)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
newStreamID, err := uuid.New()
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.metabase.FinishCopyObject(ctx, metabase.FinishCopyObject{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(streamID.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
|
||||
Version: metabase.DefaultVersion,
|
||||
StreamID: streamUUID,
|
||||
},
|
||||
NewStreamID: newStreamID,
|
||||
NewSegmentKeys: protobufkeysToMetabase(req.NewSegmentKeys),
|
||||
NewBucket: string(req.NewBucket),
|
||||
NewEncryptedObjectKey: req.NewEncryptedObjectKey,
|
||||
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce[:],
|
||||
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
|
||||
return &pb.ObjectFinishCopyResponse{}, nil
|
||||
}
|
||||
|
||||
// protobufkeysToMetabase converts []*pb.EncryptedKeyAndNonce to []metabase.EncryptedKeyAndNonce.
|
||||
func protobufkeysToMetabase(protoKeys []*pb.EncryptedKeyAndNonce) []metabase.EncryptedKeyAndNonce {
|
||||
keys := make([]metabase.EncryptedKeyAndNonce, len(protoKeys))
|
||||
|
@ -1235,3 +1235,99 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestEndpoint_CopyObject(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
satelliteSys := planet.Satellites[0]
|
||||
uplnk := planet.Uplinks[0]
|
||||
|
||||
// upload a small inline object
|
||||
err := uplnk.Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(1*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte("testbucket"),
|
||||
EncryptedPath: []byte(objects[0].ObjectKey),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
testEncryptedMetadataNonce := testrand.Nonce()
|
||||
// update the object metadata
|
||||
beginResp, err := satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: getResp.Object.Bucket,
|
||||
EncryptedObjectKey: getResp.Object.EncryptedPath,
|
||||
NewBucket: []byte("testbucket"),
|
||||
NewEncryptedObjectKey: []byte("newencryptedkey"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, beginResp.SegmentKeys, 1)
|
||||
assert.Equal(t, beginResp.EncryptedMetadataKey, objects[0].EncryptedMetadataEncryptedKey)
|
||||
assert.Equal(t, beginResp.EncryptedMetadataKeyNonce.Bytes(), objects[0].EncryptedMetadataNonce)
|
||||
|
||||
segmentKeys := pb.EncryptedKeyAndNonce{
|
||||
Position: beginResp.SegmentKeys[0].Position,
|
||||
EncryptedKeyNonce: testrand.Nonce(),
|
||||
EncryptedKey: []byte("newencryptedkey"),
|
||||
}
|
||||
|
||||
_, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
StreamId: getResp.Object.StreamId,
|
||||
NewBucket: []byte("testbucket"),
|
||||
NewEncryptedObjectKey: []byte("newobjectkey"),
|
||||
NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce,
|
||||
NewEncryptedMetadataKey: []byte("encryptedmetadatakey"),
|
||||
NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
objectsAfterCopy, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objectsAfterCopy, 2)
|
||||
|
||||
getCopyResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte("testbucket"),
|
||||
EncryptedPath: []byte("newobjectkey"),
|
||||
})
|
||||
require.NoError(t, err, objectsAfterCopy[1])
|
||||
require.NotEqual(t, getResp.Object.StreamId, getCopyResp.Object.StreamId)
|
||||
require.NotZero(t, getCopyResp.Object.StreamId)
|
||||
require.Equal(t, getResp.Object.InlineSize, getCopyResp.Object.InlineSize)
|
||||
|
||||
// compare segments
|
||||
originalSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
StreamId: getResp.Object.StreamId,
|
||||
CursorPosition: segmentKeys.Position,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
copiedSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
StreamId: getCopyResp.Object.StreamId,
|
||||
CursorPosition: segmentKeys.Position,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user