satellite/metainfo: support downloading specific object version
Protobuf definition is ready to support downloading specific version of object so we just need to wire requested version into metainfo DownloadObject endpoint. https://github.com/storj/storj/issues/6221 Change-Id: I3ddc173beb6a6cf30d782dd65c6aa5f88f2cbd44
This commit is contained in:
parent
51fefb2882
commit
f0f73fc8ae
@ -490,6 +490,10 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
|||||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := validateObjectVersion(req.ObjectVersion); err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
|
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||||
if errs2.IsCanceled(err) {
|
if errs2.IsCanceled(err) {
|
||||||
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||||
@ -508,14 +512,30 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
|||||||
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the object information
|
var object metabase.Object
|
||||||
object, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
if len(req.ObjectVersion) == 0 {
|
||||||
|
object, err = endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||||
ObjectLocation: metabase.ObjectLocation{
|
ObjectLocation: metabase.ObjectLocation{
|
||||||
ProjectID: keyInfo.ProjectID,
|
ProjectID: keyInfo.ProjectID,
|
||||||
BucketName: string(req.Bucket),
|
BucketName: string(req.Bucket),
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
var v metabase.Version
|
||||||
|
v, err = metabase.VersionFromBytes(req.ObjectVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, endpoint.convertMetabaseErr(err)
|
||||||
|
}
|
||||||
|
object, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||||
|
ObjectLocation: metabase.ObjectLocation{
|
||||||
|
ProjectID: keyInfo.ProjectID,
|
||||||
|
BucketName: string(req.Bucket),
|
||||||
|
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||||
|
},
|
||||||
|
Version: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, endpoint.convertMetabaseErr(err)
|
return nil, endpoint.convertMetabaseErr(err)
|
||||||
}
|
}
|
||||||
|
@ -702,6 +702,53 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("download specific version", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
|
||||||
|
expectedData := testrand.Bytes(256)
|
||||||
|
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, objects, 1)
|
||||||
|
|
||||||
|
committedObject := objects[0]
|
||||||
|
|
||||||
|
// download without specifying version
|
||||||
|
downloadObjectResponse, err := satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
|
||||||
|
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
||||||
|
Bucket: []byte("testbucket"),
|
||||||
|
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
|
||||||
|
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey)
|
||||||
|
require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion)
|
||||||
|
|
||||||
|
// download using explicit version
|
||||||
|
downloadObjectResponse, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
|
||||||
|
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
||||||
|
Bucket: []byte("testbucket"),
|
||||||
|
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
||||||
|
ObjectVersion: committedObject.Version.Encode(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
|
||||||
|
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey)
|
||||||
|
require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion)
|
||||||
|
|
||||||
|
// download using NON EXISTING version
|
||||||
|
nonExistingVersion := committedObject.Version + 1
|
||||||
|
_, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
|
||||||
|
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
||||||
|
Bucket: []byte("testbucket"),
|
||||||
|
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
||||||
|
ObjectVersion: nonExistingVersion.Encode(),
|
||||||
|
})
|
||||||
|
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("delete specific version", func(t *testing.T) {
|
t.Run("delete specific version", func(t *testing.T) {
|
||||||
defer ctx.Check(deleteBucket)
|
defer ctx.Check(deleteBucket)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user