satellite/metabase: disallow moving delete markers

Change-Id: I5427ce2cf6783f83e77fcd8ca2fe7b98f78a945b
This commit is contained in:
Egon Elbre 2023-11-17 15:45:00 +02:00 committed by Storj Robot
parent 26a04a5929
commit 13d02d9d11
5 changed files with 33 additions and 27 deletions

View File

@ -26,6 +26,8 @@ var (
ErrPendingObjectMissing = errs.Class("pending object missing")
// ErrPermissionDenied general error for denying permission.
ErrPermissionDenied = errs.Class("permission denied")
// ErrMethodNotAllowed general error when operation is not allowed.
ErrMethodNotAllowed = errs.Class("method not allowed")
)
// Common constants for segment keys.

View File

@ -122,7 +122,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
sourceObject, err := getObjectExactVersion(ctx, tx, opts)
sourceObject, err := getObjectNonPendingExactVersion(ctx, tx, opts)
if err != nil {
if ErrObjectNotFound.Has(err) {
return ErrObjectNotFound.New("source object not found")
@ -133,6 +133,9 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
// TODO(versioning): should we report it as "not found" instead?
return ErrObjectNotFound.New("object was changed during copy")
}
if sourceObject.Status.IsDeleteMarker() {
return ErrMethodNotAllowed.New("copying delete marker is not allowed")
}
if opts.VerifyLimits != nil {
err := opts.VerifyLimits(sourceObject.TotalEncryptedSize, int64(sourceObject.SegmentCount))
@ -335,15 +338,16 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
return newObject, nil
}
// getObjectExactVersion returns object information for exact version.
func getObjectExactVersion(ctx context.Context, tx tagsql.Tx, opts FinishCopyObject) (_ Object, err error) {
// getObjectNonPendingExactVersion returns object information for exact version.
//
// Note: this returns both committed objects and delete markers.
func getObjectNonPendingExactVersion(ctx context.Context, tx tagsql.Tx, opts FinishCopyObject) (_ Object, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Object{}, err
}
// TODO(ver): should we allow copying delete markers?
object := Object{}
err = tx.QueryRowContext(ctx, `
SELECT
@ -356,7 +360,7 @@ func getObjectExactVersion(ctx context.Context, tx tagsql.Tx, opts FinishCopyObj
FROM objects
WHERE
(project_id, bucket_name, object_key, version) = ($1, $2, $3, $4) AND
status IN `+statusesCommitted+` AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(

View File

@ -1381,8 +1381,8 @@ func TestFinishCopyObject(t *testing.T) {
NewVersioned: false,
},
ErrClass: &metabase.ErrObjectNotFound,
ErrText: "source object not found",
ErrClass: &metabase.ErrMethodNotAllowed,
ErrText: "copying delete marker is not allowed",
}.Check(ctx, t, db)
metabasetest.Verify{
@ -1440,8 +1440,8 @@ func TestFinishCopyObject(t *testing.T) {
NewVersioned: true,
},
ErrClass: &metabase.ErrObjectNotFound,
ErrText: "source object not found",
ErrClass: &metabase.ErrMethodNotAllowed,
ErrText: "copying delete marker is not allowed",
}.Check(ctx, t, db)
metabasetest.Verify{

View File

@ -177,6 +177,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
return Error.Wrap(err)
}
var oldStatus ObjectStatus
var segmentsCount int
var hasMetadata bool
var streamID uuid.UUID
@ -202,13 +203,18 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
WHERE
(project_id, bucket_name, object_key, version) = ($5, $6, $7, $8)
RETURNING
(
SELECT status
FROM objects
WHERE (project_id, bucket_name, object_key, version) = ($5, $6, $7, $8)
),
segment_count,
objects.encrypted_metadata IS NOT NULL AND LENGTH(objects.encrypted_metadata) > 0 AS has_metadata,
stream_id
`, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey,
opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName),
opts.ObjectKey, opts.Version, newStatus, precommit.HighestVersion+1).
Scan(&segmentsCount, &hasMetadata, &streamID)
Scan(&oldStatus, &segmentsCount, &hasMetadata, &streamID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -222,6 +228,9 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
if segmentsCount != len(opts.NewSegmentKeys) {
return ErrInvalidRequest.New("wrong number of segments keys received")
}
if oldStatus.IsDeleteMarker() {
return ErrMethodNotAllowed.New("moving delete marker is not allowed")
}
if hasMetadata {
switch {
case opts.NewEncryptedMetadataKeyNonce.IsZero() && len(opts.NewEncryptedMetadataKey) != 0:

View File

@ -723,7 +723,7 @@ func TestFinishMoveObject(t *testing.T) {
obj := metabasetest.RandObjectStream()
obj.Version = 12000
_ = metabasetest.CreateObject(ctx, t, db, obj, 0)
unversionedObject := metabasetest.CreateObject(ctx, t, db, obj, 0)
obj.Version = 13000
versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)
@ -754,13 +754,6 @@ func TestFinishMoveObject(t *testing.T) {
},
}.Check(ctx, t, db)
movedObject := deletionResult.Markers[0]
movedObject.ObjectStream.ProjectID = obj.ProjectID
movedObject.ObjectStream.BucketName = obj.BucketName
movedObject.ObjectStream.ObjectKey = obj.ObjectKey
movedObject.ObjectStream.Version = 13001
movedObject.Status = metabase.CommittedUnversioned
// move of delete marker should fail
metabasetest.FinishMoveObject{
Opts: metabase.FinishMoveObject{
@ -770,12 +763,15 @@ func TestFinishMoveObject(t *testing.T) {
NewVersioned: false,
},
ErrClass: &metabase.ErrMethodNotAllowed,
ErrText: "moving delete marker is not allowed",
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(unversionedObject),
metabase.RawObject(versionedObject),
metabase.RawObject(movedObject),
metabase.RawObject(deletionResult.Markers[0]),
},
}.Check(ctx, t, db)
})
@ -816,13 +812,6 @@ func TestFinishMoveObject(t *testing.T) {
},
}.Check(ctx, t, db)
movedObject := deletionResult.Markers[0]
movedObject.ObjectStream.ProjectID = obj.ProjectID
movedObject.ObjectStream.BucketName = obj.BucketName
movedObject.ObjectStream.ObjectKey = obj.ObjectKey
movedObject.ObjectStream.Version = 13001
movedObject.Status = metabase.CommittedVersioned
// copy of delete marker should fail
metabasetest.FinishMoveObject{
Opts: metabase.FinishMoveObject{
@ -832,11 +821,13 @@ func TestFinishMoveObject(t *testing.T) {
NewVersioned: true,
},
ErrClass: &metabase.ErrMethodNotAllowed,
ErrText: "moving delete marker is not allowed",
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(movedObject),
metabase.RawObject(deletionResult.Markers[0]),
metabase.RawObject(sourceObject),
metabase.RawObject(unversionedObject),
metabase.RawObject(versionedObject),