satellite/metainfo: support deleting specific object version

Protobuf definition is ready to support deleting specific version of
object so we just need to wire requested version into metainfo
BeginDeleteObject endpoint.

Dependencies bumped to get latest metainfo protobuf definition.

https://github.com/storj/storj/issues/6221

Change-Id: Ifc3cc0b49d9acdf4f7e57e7184b0f2ae045f9113
This commit is contained in:
Michal Niewrzal 2023-09-25 12:52:28 +02:00 committed by Storj Robot
parent 9338f3f088
commit 988ebbaf8d
4 changed files with 107 additions and 8 deletions

View File

@ -5,6 +5,7 @@ package metabase
import (
"database/sql/driver"
"encoding/binary"
"math"
"sort"
"strconv"
@ -366,6 +367,23 @@ const PendingVersion = Version(0)
// Version in DB is represented as INT4.
const MaxVersion = Version(math.MaxInt32)
// Encode encodes version to bytes.
// TODO(ver): this is not final approach to version encoding. It's simplified
// version for internal testing purposes. Will be changed in future.
func (v Version) Encode() []byte {
var bytes [8]byte
binary.BigEndian.PutUint64(bytes[:], uint64(v))
return bytes[:]
}
// VersionFromBytes decodes version from bytes.
func VersionFromBytes(bytes []byte) (Version, error) {
if len(bytes) != 8 {
return Version(0), ErrInvalidRequest.New("invalid version")
}
return Version(binary.BigEndian.Uint64(bytes)), nil
}
// ObjectStatus defines the status that the object is in.
//
// There are two types of objects:

View File

@ -1258,6 +1258,10 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
if err := validateObjectVersion(req.ObjectVersion); err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
var deletedObjects []*pb.Object
if req.GetStatus() == int32(metabase.Pending) {
@ -1281,7 +1285,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
}
}
} else {
deletedObjects, err = endpoint.DeleteCommittedObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedObjectKey))
deletedObjects, err = endpoint.DeleteCommittedObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedObjectKey), req.ObjectVersion)
}
if err != nil {
if !canRead && !canList {
@ -1742,9 +1746,8 @@ func (endpoint *Endpoint) pendingObjectEntryToProtoListItem(ctx context.Context,
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
// TODO: see note on DeleteObjectAnyStatus.
func (endpoint *Endpoint) DeleteCommittedObject(
ctx context.Context, projectID uuid.UUID, bucket string, object metabase.ObjectKey,
ctx context.Context, projectID uuid.UUID, bucket string, object metabase.ObjectKey, version []byte,
) (deletedObjects []*pb.Object, err error) {
defer mon.Task()(&ctx, projectID.String(), bucket, object)(&err)
@ -1756,9 +1759,21 @@ func (endpoint *Endpoint) DeleteCommittedObject(
var result metabase.DeleteObjectResult
if endpoint.config.ServerSideCopy {
result, err = endpoint.metabase.DeleteObjectLastCommitted(ctx, metabase.DeleteObjectLastCommitted{
ObjectLocation: req,
})
if len(version) == 0 {
result, err = endpoint.metabase.DeleteObjectLastCommitted(ctx, metabase.DeleteObjectLastCommitted{
ObjectLocation: req,
})
} else {
var v metabase.Version
v, err = metabase.VersionFromBytes(version)
if err != nil {
return nil, err
}
result, err = endpoint.metabase.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
ObjectLocation: req,
Version: v,
})
}
} else {
result, err = endpoint.metabase.DeleteObjectsAllVersions(ctx, metabase.DeleteObjectsAllVersions{Locations: []metabase.ObjectLocation{req}})
}
@ -1785,7 +1800,6 @@ func (endpoint *Endpoint) DeleteCommittedObject(
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
// TODO: see note on DeleteObjectAnyStatus.
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream, usePendingObjectTable bool) (deletedObjects []*pb.Object, err error) {
req := metabase.DeletePendingObject{
ObjectStream: stream,

View File

@ -689,6 +689,66 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
}
})
t.Run("delete specific version", func(t *testing.T) {
defer ctx.Check(deleteBucket)
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(100))
require.NoError(t, err)
// get encrypted object key and version
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
endpoint := planet.Satellites[0].Metainfo.Endpoint
// first try to delete not existing version
nonExistingVersion := objects[0].Version + 1
response, err := endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nonExistingVersion.Encode(),
})
require.NoError(t, err)
require.Nil(t, response.Object)
// now delete using explicit version
response, err = endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: objects[0].Version.Encode(),
})
require.NoError(t, err)
require.NotNil(t, response.Object)
require.EqualValues(t, objects[0].ObjectKey, response.Object.EncryptedObjectKey)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(100))
require.NoError(t, err)
// now delete using empty version (latest version)
response, err = endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nil,
})
require.NoError(t, err)
require.NotNil(t, response.Object)
require.EqualValues(t, objects[0].ObjectKey, response.Object.EncryptedObjectKey)
objects, err = planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
require.Empty(t, objects)
})
})
}
@ -1973,7 +2033,7 @@ func TestEndpoint_DeleteCommittedObject(t *testing.T) {
deleteObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID
_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey))
_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey), []byte{})
require.NoError(t, err)
}
testDeleteObject(t, createObject, deleteObject)

View File

@ -303,6 +303,13 @@ func validateBucketLabel(label []byte) error {
return nil
}
func validateObjectVersion(version []byte) error {
if len(version) != 0 && len(version) != 8 {
return Error.New("invalid object version")
}
return nil
}
func isLowerLetter(r byte) bool {
return r >= 'a' && r <= 'z'
}