satellite/metabase: adjust FinishCopyObject for versioning

Change-Id: Ia2089238040624937b208c142d118dcbf5aa3432
This commit is contained in:
Egon Elbre 2023-10-17 18:19:44 +03:00 committed by Storj Robot
parent 97ac27942c
commit a1a8c258d5
4 changed files with 307 additions and 142 deletions

View File

@ -5,10 +5,10 @@ package metabase
import (
"context"
"database/sql"
"errors"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
@ -52,6 +52,12 @@ type FinishCopyObject struct {
NewSegmentKeys []EncryptedKeyAndNonce
// NewDisallowDelete indicates whether the user is allowed to delete an existing unversioned object.
NewDisallowDelete bool
// NewVersioned indicates that the object allows multiple versions.
NewVersioned bool
// VerifyLimits holds a callback by which the caller can interrupt the copy
// if it turns out completing the copy would exceed a limit.
// It will be called only once.
@ -106,15 +112,18 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
var copyMetadata []byte
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
sourceObject, ancestorStreamID, objectAtDestination, nextAvailableVersion, err := getObjectAtCopySourceAndDestination(ctx, tx, opts)
sourceObject, err := getObjectExactVersion(ctx, tx, opts)
if err != nil {
if ErrObjectNotFound.Has(err) {
return ErrObjectNotFound.New("source object not found")
}
return err
}
if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID {
newObject = sourceObject
return nil
if sourceObject.StreamID != opts.StreamID {
// TODO(versioning): should we report it as "not found" instead?
return ErrObjectNotFound.New("object was changed during copy")
}
if opts.VerifyLimits != nil {
err := opts.VerifyLimits(sourceObject.TotalEncryptedSize, int64(sourceObject.SegmentCount))
if err != nil {
@ -216,57 +225,66 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
copyMetadata = sourceObject.EncryptedMetadata
}
if objectAtDestination != nil {
version := objectAtDestination.Version
deletedObjects, err := db.deleteObjectExactVersion(
ctx, DeleteObjectExactVersion{
Version: version,
ObjectLocation: ObjectLocation{
ProjectID: objectAtDestination.ProjectID,
BucketName: objectAtDestination.BucketName,
ObjectKey: objectAtDestination.ObjectKey,
},
}, tx,
)
var highestVersion Version
if !opts.NewVersioned {
// TODO(ver): this logic can probably merged into update query
//
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to delete existing object at copy destination: %w", err)
return Error.New("unable to delete object at target location: %w", err)
}
if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
// The object at the destination was the ancestor!
if ancestorStreamID == objectAtDestination.StreamID {
if len(deletedObjects.Objects) == 0 {
return Error.New("ancestor is gone, please retry operation")
}
highestVersion = deleted.MaxVersion
} else {
highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to query highest version: %w", err)
}
}
newStatus := committedWhereVersioned(opts.NewVersioned)
// TODO we need to handle metadata correctly (copy from original object or replace)
row := tx.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, status, segment_count,
status, expires_at, segment_count,
encryption,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
zombie_deletion_deadline
) VALUES (
$1, $2, $3, $4, $5,
$6,`+statusCommittedUnversioned+`, $7,
$8,
$9, $10, $11,
$12, $13, $14, null
$6, $7, $8,
$9,
$10, $11, $12,
$13, $14, $15, null
)
RETURNING
created_at`,
opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, nextAvailableVersion, opts.NewStreamID,
sourceObject.ExpiresAt, sourceObject.SegmentCount,
opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, highestVersion+1, opts.NewStreamID,
newStatus, sourceObject.ExpiresAt, sourceObject.SegmentCount,
encryptionParameters{&sourceObject.Encryption},
copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
sourceObject.TotalPlainSize, sourceObject.TotalEncryptedSize, sourceObject.FixedSegmentSize,
)
newObject = sourceObject
newObject.Version = nextAvailableVersion
newObject.Version = highestVersion + 1
newObject.Status = newStatus
err = row.Scan(&newObject.CreatedAt)
if err != nil {
@ -327,123 +345,49 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
return newObject, nil
}
// Fetch the following in a single query:
// - object at copy source location (error if it's not there)
// - next version available
// - object at copy destination location (if any).
func getObjectAtCopySourceAndDestination(
ctx context.Context, tx tagsql.Tx, opts FinishCopyObject,
) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, nextAvailableVersion Version, err error) {
// getObjectExactVersion returns object information for exact version.
func getObjectExactVersion(ctx context.Context, tx tagsql.Tx, opts FinishCopyObject) (_ Object, err error) {
defer mon.Task()(&ctx)(&err)
var highestVersion Version
if err := opts.Verify(); err != nil {
return Object{}, err
}
sourceObject.ProjectID = opts.ProjectID
sourceObject.BucketName = opts.BucketName
sourceObject.ObjectKey = opts.ObjectKey
sourceObject.Version = opts.Version
sourceObject.Status = CommittedUnversioned
// get objects at source and destination (if any)
rows, err := tx.QueryContext(ctx, `
WITH destination_current_versions AS (
SELECT status, max(version) AS version
FROM objects
WHERE
project_id = $1 AND
bucket_name = $5 AND
object_key = $6
GROUP BY status
)
// TODO(ver): should we allow copying delete markers?
object := Object{}
err = tx.QueryRowContext(ctx, `
SELECT
objects.stream_id,
expires_at,
stream_id, status,
created_at, expires_at,
segment_count,
encrypted_metadata,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
0,
coalesce((SELECT max(version) FROM destination_current_versions),0) AS highest_version
encryption
FROM objects
WHERE
project_id = $1 AND
bucket_name = $3 AND
object_key = $4 AND
version = $2 AND
status = `+statusCommittedUnversioned+`
UNION ALL
SELECT
stream_id,
expires_at,
segment_count,
NULL,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
version,
(SELECT max(version) FROM destination_current_versions) AS highest_version
FROM objects
WHERE
project_id = $1 AND
bucket_name = $5 AND
object_key = $6 AND
version = (SELECT version FROM destination_current_versions
WHERE status = `+statusCommittedUnversioned+`)`,
sourceObject.ProjectID, sourceObject.Version,
[]byte(sourceObject.BucketName), sourceObject.ObjectKey,
opts.NewBucket, opts.NewEncryptedObjectKey)
if err != nil {
return Object{}, uuid.UUID{}, nil, 0, err
}
defer func() {
err = errs.Combine(err, rows.Err(), rows.Close())
}()
if !rows.Next() {
return Object{}, uuid.UUID{}, nil, 0, ErrObjectNotFound.New("source object not found")
}
err = rows.Scan(
&sourceObject.StreamID,
&sourceObject.ExpiresAt,
&sourceObject.SegmentCount,
&sourceObject.EncryptedMetadata,
&sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize,
encryptionParameters{&sourceObject.Encryption},
&highestVersion,
&highestVersion,
(project_id, bucket_name, object_key, version) = ($1, $2, $3, $4) AND
status IN `+statusesCommitted+` AND
(expires_at IS NULL OR expires_at > now())`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(
&object.StreamID, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)
if err != nil {
return Object{}, uuid.UUID{}, nil, 0, Error.New("unable to query object status: %w", err)
if errors.Is(err, sql.ErrNoRows) {
return Object{}, ErrObjectNotFound.Wrap(Error.Wrap(err))
}
if sourceObject.StreamID != opts.StreamID {
return Object{}, uuid.UUID{}, nil, 0, ErrObjectNotFound.New("object was changed during copy")
return Object{}, Error.New("unable to query object status: %w", err)
}
if rows.Next() {
destinationObject = &Object{}
destinationObject.ProjectID = opts.ProjectID
destinationObject.BucketName = opts.NewBucket
destinationObject.ObjectKey = opts.NewEncryptedObjectKey
// There is an object at the destination.
// We will delete it before doing the copy
err := rows.Scan(
&destinationObject.StreamID,
&destinationObject.ExpiresAt,
&destinationObject.SegmentCount,
&destinationObject.EncryptedMetadata,
&destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize,
encryptionParameters{&destinationObject.Encryption},
&destinationObject.Version,
&highestVersion,
)
if err != nil {
return Object{}, uuid.UUID{}, nil, 0, Error.New("error while reading existing object at destination: %w", err)
}
}
object.ProjectID = opts.ProjectID
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
object.Version = opts.Version
if rows.Next() {
return Object{}, uuid.UUID{}, nil, 0, Error.New("expected 1 or 2 rows, got 3 or more")
}
return sourceObject, ancestorStreamID, destinationObject, highestVersion + 1, nil
return object, nil
}

View File

@ -967,16 +967,49 @@ func TestFinishCopyObject(t *testing.T) {
}.Run(ctx, t, db, obj, byte(numberOfSegments))
obj.StreamID = testrand.UUID()
_, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
expectedCopy, _, expectedCopySegments := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
CopyObjectStream: &obj,
}.Run(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
metabase.RawObject(expectedCopy),
},
Segments: expectedOriginalSegments,
Segments: expectedCopySegments,
}.Check(ctx, t, db)
})
t.Run("finish copy object versioned to same destination", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
// both should be preserved
obj := metabasetest.RandObjectStream()
numberOfSegments := 10
originalObj, _ := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: obj,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, obj, byte(numberOfSegments))
obj.StreamID = testrand.UUID()
expectedCopy, expectedOriginalSegments, expectedCopySegments := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
CopyObjectStream: &obj,
NewVersioned: true,
}.Run(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
metabase.RawObject(expectedCopy),
},
Segments: append(expectedCopySegments, expectedOriginalSegments...),
}.Check(ctx, t, db)
})
@ -1116,5 +1149,184 @@ func TestFinishCopyObject(t *testing.T) {
}.Check(ctx, t, db)
}
})
t.Run("existing object is overwritten", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
initialStream := metabasetest.RandObjectStream()
initialObject := metabasetest.CreateObject(ctx, t, db, initialStream, 0)
conflictObjStream := metabasetest.RandObjectStream()
conflictObjStream.ProjectID = initialStream.ProjectID
metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0)
newNonce := testrand.Nonce()
newMetadataKey := testrand.Bytes(265)
newUUID := testrand.UUID()
now := time.Now()
copiedObject := metabase.Object{
ObjectStream: metabase.ObjectStream{
ProjectID: conflictObjStream.ProjectID,
BucketName: conflictObjStream.BucketName,
ObjectKey: conflictObjStream.ObjectKey,
StreamID: newUUID,
Version: conflictObjStream.Version + 1,
},
CreatedAt: now,
Status: metabase.CommittedUnversioned,
Encryption: initialObject.Encryption,
EncryptedMetadataNonce: newNonce[:],
EncryptedMetadataEncryptedKey: newMetadataKey,
}
metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
NewBucket: conflictObjStream.BucketName,
NewStreamID: newUUID,
ObjectStream: initialStream,
NewEncryptedObjectKey: conflictObjStream.ObjectKey,
NewEncryptedMetadataKeyNonce: newNonce,
NewEncryptedMetadataKey: newMetadataKey,
},
Result: copiedObject,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(initialObject),
metabase.RawObject(copiedObject),
},
}.Check(ctx, t, db)
})
t.Run("existing object is not overwritten, permission denied", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
initialStream := metabasetest.RandObjectStream()
initialObject := metabasetest.CreateObject(ctx, t, db, initialStream, 0)
conflictObjStream := metabasetest.RandObjectStream()
conflictObjStream.ProjectID = initialStream.ProjectID
conflictObject := metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0)
newNonce := testrand.Nonce()
newMetadataKey := testrand.Bytes(265)
newUUID := testrand.UUID()
metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
NewBucket: conflictObjStream.BucketName,
ObjectStream: initialStream,
NewStreamID: newUUID,
NewEncryptedObjectKey: conflictObjStream.ObjectKey,
NewEncryptedMetadataKeyNonce: newNonce,
NewEncryptedMetadataKey: newMetadataKey,
NewDisallowDelete: true,
},
ErrClass: &metabase.ErrPermissionDenied,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(conflictObject),
metabase.RawObject(initialObject),
},
}.Check(ctx, t, db)
})
t.Run("versioned targets unversioned and versioned", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj := metabasetest.RandObjectStream()
obj.Version = 12000
unversionedObject := metabasetest.CreateObject(ctx, t, db, obj, 0)
obj.Version = 13000
versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)
sourceStream := metabasetest.RandObjectStream()
sourceStream.ProjectID = obj.ProjectID
sourceObject := metabasetest.CreateObject(ctx, t, db, sourceStream, 0)
newStreamID := testrand.UUID()
copiedObject := sourceObject
copiedObject.ObjectStream.ProjectID = obj.ProjectID
copiedObject.ObjectStream.BucketName = obj.BucketName
copiedObject.ObjectStream.ObjectKey = obj.ObjectKey
copiedObject.ObjectStream.Version = 13001
copiedObject.ObjectStream.StreamID = newStreamID
copiedObject.Status = metabase.CommittedVersioned
// versioned copy should leave everything else as is
metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: sourceStream,
NewBucket: obj.BucketName,
NewStreamID: newStreamID,
NewEncryptedObjectKey: obj.ObjectKey,
NewVersioned: true,
},
Result: copiedObject,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(unversionedObject),
metabase.RawObject(versionedObject),
metabase.RawObject(sourceObject),
metabase.RawObject(copiedObject),
},
}.Check(ctx, t, db)
})
t.Run("unversioned targets unversioned and versioned", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj := metabasetest.RandObjectStream()
obj.Version = 12000
metabasetest.CreateObject(ctx, t, db, obj, 0)
obj.Version = 13000
versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)
sourceStream := metabasetest.RandObjectStream()
sourceStream.ProjectID = obj.ProjectID
sourceObject := metabasetest.CreateObject(ctx, t, db, sourceStream, 0)
newStreamID := testrand.UUID()
copiedObject := sourceObject
copiedObject.ObjectStream.ProjectID = obj.ProjectID
copiedObject.ObjectStream.BucketName = obj.BucketName
copiedObject.ObjectStream.ObjectKey = obj.ObjectKey
copiedObject.ObjectStream.Version = 13001
copiedObject.ObjectStream.StreamID = newStreamID
copiedObject.Status = metabase.CommittedUnversioned
// unversioned copy should only delete the unversioned object
metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: sourceStream,
NewBucket: obj.BucketName,
NewStreamID: newStreamID,
NewEncryptedObjectKey: obj.ObjectKey,
NewVersioned: false,
},
Result: copiedObject,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(versionedObject),
metabase.RawObject(sourceObject),
metabase.RawObject(copiedObject),
},
}.Check(ctx, t, db)
})
})
}

View File

@ -412,6 +412,9 @@ type CreateObjectCopy struct {
OriginalSegments []metabase.Segment
FinishObject *metabase.FinishCopyObject
CopyObjectStream *metabase.ObjectStream
NewDisallowDelete bool
NewVersioned bool
}
// Run creates the copy.
@ -469,6 +472,9 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
NewEncryptedObjectKey: copyStream.ObjectKey,
NewEncryptedMetadataKeyNonce: testrand.Nonce(),
NewEncryptedMetadataKey: testrand.Bytes(32),
NewDisallowDelete: cc.NewDisallowDelete,
NewVersioned: cc.NewVersioned,
}
}

View File

@ -2239,6 +2239,9 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce,
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
// TODO(ver): currently we always allow deletion, to not change behaviour.
NewDisallowDelete: false,
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.addStorageUsageUpToLimit(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
},