satellite/metainfo: use protbuf ObjectVersion with GetObject request
We need to update metainfo GetObject endpoint to use ObjectVersion ([]byte) field instead of old Version field (int32). Change-Id: I61663ec8d9f5c731f91346a285048477fb493730
This commit is contained in:
parent
f7a95e0077
commit
67f32bd519
@ -368,6 +368,10 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err := validateObjectVersion(req.ObjectVersion); err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
objectLocation := metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
@ -375,20 +379,29 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
}
|
||||
|
||||
var mbObject metabase.Object
|
||||
if req.Version == 0 {
|
||||
if len(req.ObjectVersion) == 0 {
|
||||
mbObject, err = endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: objectLocation,
|
||||
})
|
||||
} else {
|
||||
var v metabase.Version
|
||||
v, err = metabase.VersionFromBytes(req.ObjectVersion)
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
mbObject, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: metabase.Version(req.Version),
|
||||
Version: v,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
|
||||
if mbObject.Status.IsDeleteMarker() {
|
||||
return nil, rpcstatus.Error(rpcstatus.NotFound, "object not found")
|
||||
}
|
||||
|
||||
{
|
||||
tags := []eventkit.Tag{
|
||||
eventkit.Bool("expires", mbObject.ExpiresAt != nil),
|
||||
|
@ -102,34 +102,47 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
|
||||
require.NotEmpty(t, item.EncryptedObjectKey)
|
||||
require.True(t, item.CreatedAt.Before(time.Now()))
|
||||
|
||||
object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
||||
response, err := satellite.Metainfo.Endpoint.GetObject(ctx, &pb.GetObjectRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte(expectedBucketName),
|
||||
EncryptedObjectKey: item.EncryptedObjectKey,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, item.EncryptedObjectKey, object.EncryptedObjectKey)
|
||||
|
||||
require.NotEmpty(t, object.StreamID)
|
||||
require.EqualValues(t, item.Version, object.Version)
|
||||
require.Equal(t, item.EncryptedObjectKey, response.Object.EncryptedObjectKey)
|
||||
require.NotEmpty(t, response.Object.StreamId)
|
||||
|
||||
// get with version
|
||||
object, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
||||
response, err = satellite.Metainfo.Endpoint.GetObject(ctx, &pb.GetObjectRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte(expectedBucketName),
|
||||
EncryptedObjectKey: item.EncryptedObjectKey,
|
||||
Version: item.Version,
|
||||
ObjectVersion: response.Object.ObjectVersion,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, item.EncryptedObjectKey, object.EncryptedObjectKey)
|
||||
require.Equal(t, item.EncryptedObjectKey, response.Object.EncryptedObjectKey)
|
||||
|
||||
// get with WRONG version, should return error
|
||||
_, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
||||
version, err := metabase.VersionFromBytes(response.Object.ObjectVersion)
|
||||
require.NoError(t, err)
|
||||
version++
|
||||
nonExistingVersion := version.Encode()
|
||||
_, err = satellite.Metainfo.Endpoint.GetObject(ctx, &pb.GetObjectRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte(expectedBucketName),
|
||||
EncryptedObjectKey: item.EncryptedObjectKey,
|
||||
Version: item.Version + 1,
|
||||
ObjectVersion: nonExistingVersion,
|
||||
})
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
||||
}
|
||||
|
||||
// TODO(ver): add tests with delete marker
|
||||
|
||||
items, _, err = metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
||||
Bucket: []byte(expectedBucketName),
|
||||
Limit: 3,
|
||||
|
Loading…
Reference in New Issue
Block a user