satellite/metabase: copy handles version when overwriting target location

When doing server-side copy, deletes the committed version of the target location if it already exists. It does not touch pending versions. The version of the copy is set to the highest already existing version + 1.

Fixes: https://github.com/storj/storj/issues/5071

Change-Id: I1d91ac17054834b1f4f0970a9fa5d58198c58a37
This commit is contained in:
Fadila Khadar 2022-09-15 22:32:56 +02:00 committed by Michal Niewrzal
parent a97cd97789
commit f58129786b
3 changed files with 205 additions and 45 deletions

View File

@ -106,12 +106,14 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
var copyMetadata []byte
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
sourceObject, ancestorStreamID, objectAtDestination, err := getObjectAtCopySourceAndDestination(ctx, tx, opts)
sourceObject, ancestorStreamID, objectAtDestination, nextAvailableVersion, err := getObjectAtCopySourceAndDestination(ctx, tx, opts)
if err != nil {
return err
}
if !db.config.MultipleVersions {
nextAvailableVersion = opts.Version
}
if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID {
newObject = sourceObject
return nil
@ -210,9 +212,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
}
if objectAtDestination != nil {
version := opts.Version
if db.config.MultipleVersions {
version = objectAtDestination.Version
}
deletedObjects, err := db.deleteObjectExactVersionServerSideCopy(
ctx, DeleteObjectExactVersion{
Version: opts.Version,
Version: version,
ObjectLocation: ObjectLocation{
ProjectID: objectAtDestination.ProjectID,
BucketName: objectAtDestination.BucketName,
@ -253,7 +259,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
)
RETURNING
created_at`,
opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, nextAvailableVersion, opts.NewStreamID,
sourceObject.ExpiresAt, sourceObject.SegmentCount,
encryptionParameters{&sourceObject.Encryption},
copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
@ -261,6 +267,8 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
)
newObject = sourceObject
newObject.Version = nextAvailableVersion
err = row.Scan(&newObject.CreatedAt)
if err != nil {
return Error.New("unable to copy object: %w", err)
@ -331,25 +339,43 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
// Fetch the following in a single query:
// - object at copy source location (error if it's not there)
// - source ancestor stream id (if any)
// - next version available
// - object at copy destination location (if any).
func getObjectAtCopySourceAndDestination(
ctx context.Context, tx tagsql.Tx, opts FinishCopyObject,
) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, err error) {
) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, nextAvailableVersion Version, err error) {
defer mon.Task()(&ctx)(&err)
var ancestorStreamIDBytes []byte
var highestVersion Version
sourceObject.ProjectID = opts.ProjectID
sourceObject.BucketName = opts.BucketName
sourceObject.ObjectKey = opts.ObjectKey
sourceObject.Version = opts.Version
sourceObject.Status = Committed
// get objects at source and destination (if any)
rows, err := tx.QueryContext(ctx, `
WITH destination_current_versions AS (
SELECT status, max(version) AS version
FROM objects
WHERE
project_id = $1 AND
bucket_name = $5 AND
object_key = $6
GROUP BY status
)
SELECT
objects.stream_id,
bucket_name,
object_key,
expires_at,
segment_count,
encrypted_metadata,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
segment_copies.ancestor_stream_id
segment_copies.ancestor_stream_id,
0,
coalesce((SELECT max(version) FROM destination_current_versions),0) AS highest_version
FROM objects
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
WHERE
@ -360,67 +386,59 @@ func getObjectAtCopySourceAndDestination(
status = `+committedStatus+`
UNION ALL
SELECT
objects.stream_id,
bucket_name,
object_key,
stream_id,
expires_at,
segment_count,
encrypted_metadata,
NULL,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
NULL
NULL,
version,
(SELECT max(version) FROM destination_current_versions) AS highest_version
FROM objects
WHERE
project_id = $1 AND
bucket_name = $5 AND
object_key = $6 AND
version = $2 AND
status = `+committedStatus,
opts.ProjectID, opts.Version,
[]byte(opts.BucketName), opts.ObjectKey,
project_id = $1 AND
bucket_name = $5 AND
object_key = $6 AND
version = (SELECT version FROM destination_current_versions
WHERE status = `+committedStatus+`)`,
sourceObject.ProjectID, sourceObject.Version,
[]byte(sourceObject.BucketName), sourceObject.ObjectKey,
opts.NewBucket, opts.NewEncryptedObjectKey)
if err != nil {
return Object{}, uuid.UUID{}, nil, err
return Object{}, uuid.UUID{}, nil, 0, err
}
defer func() {
err = errs.Combine(err, rows.Err())
err = errs.Combine(err, rows.Close())
err = errs.Combine(err, rows.Err(), rows.Close())
}()
if !rows.Next() {
return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object not found")
return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("source object not found")
}
err = rows.Scan(
&sourceObject.StreamID,
&sourceObject.BucketName,
&sourceObject.ObjectKey,
&sourceObject.ExpiresAt,
&sourceObject.SegmentCount,
&sourceObject.EncryptedMetadata,
&sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize,
encryptionParameters{&sourceObject.Encryption},
&ancestorStreamIDBytes,
&highestVersion,
&highestVersion,
)
if err != nil {
return Object{}, uuid.UUID{}, nil, Error.New("unable to query object status: %w", err)
}
if sourceObject.BucketName != opts.BucketName || sourceObject.ObjectKey != opts.ObjectKey {
return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object is gone")
return Object{}, uuid.UUID{}, nil, 0, Error.New("unable to query object status: %w", err)
}
if sourceObject.StreamID != opts.StreamID {
return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("object was changed during copy")
return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("object was changed during copy")
}
sourceObject.ProjectID = opts.ProjectID
sourceObject.Version = opts.Version
sourceObject.Status = Committed
if len(ancestorStreamIDBytes) != 0 {
// Source object already was a copy, the new copy becomes yet another copy of the existing ancestor
ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes)
if err != nil {
return Object{}, uuid.UUID{}, nil, err
return Object{}, uuid.UUID{}, nil, 0, err
}
} else {
// Source object was not a copy, it will now become an ancestor (unless it has only inline segments)
@ -437,27 +455,23 @@ func getObjectAtCopySourceAndDestination(
// We will delete it before doing the copy
err := rows.Scan(
&destinationObject.StreamID,
&destinationObject.BucketName,
&destinationObject.ObjectKey,
&destinationObject.ExpiresAt,
&destinationObject.SegmentCount,
&destinationObject.EncryptedMetadata,
&destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize,
encryptionParameters{&destinationObject.Encryption},
&_bogusBytes,
&destinationObject.Version,
&highestVersion,
)
if err != nil {
return Object{}, uuid.UUID{}, nil, Error.New("error while reading existing object at destination: %w", err)
}
if destinationObject.BucketName != opts.NewBucket || destinationObject.ObjectKey != opts.NewEncryptedObjectKey {
return Object{}, uuid.UUID{}, nil, Error.New("unexpected")
return Object{}, uuid.UUID{}, nil, 0, Error.New("error while reading existing object at destination: %w", err)
}
}
if rows.Next() {
return Object{}, uuid.UUID{}, nil, Error.New("expected 1 or 2 rows, got 3 or more")
return Object{}, uuid.UUID{}, nil, 0, Error.New("expected 1 or 2 rows, got 3 or more")
}
return sourceObject, ancestorStreamID, destinationObject, nil
return sourceObject, ancestorStreamID, destinationObject, highestVersion + 1, nil
}

View File

@ -1011,5 +1011,145 @@ func TestFinishCopyObject(t *testing.T) {
Copies: []metabase.RawCopy{},
}.Check(ctx, t, db)
})
t.Run("finish copy object to existing pending destination", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
sourceObjStream := metabasetest.RandObjectStream()
destinationObjStream := metabasetest.RandObjectStream()
destinationObjStream.ProjectID = sourceObjStream.ProjectID
// testcases:
// - versions of pending objects
// - version of committed object
// - expected copy version
testCases := []struct {
Bucket string
Key metabase.ObjectKey
NewBucket string
NewKey metabase.ObjectKey
sourcePendingVersions []metabase.Version
sourceCommittedVersion metabase.Version
destinationPendingVersions []metabase.Version
destinationCommittedVersion metabase.Version
expectedCopyVersion metabase.Version
}{
// the same bucket
{"testbucket", "object", "testbucket", "new-object",
[]metabase.Version{}, 2,
[]metabase.Version{}, 1,
2},
{"testbucket", "object", "testbucket", "new-object",
[]metabase.Version{}, 1,
[]metabase.Version{1}, 2,
3},
{"testbucket", "object", "testbucket", "new-object",
[]metabase.Version{}, 1,
[]metabase.Version{1, 3}, 2,
4},
{"testbucket", "object", "testbucket", "new-object",
[]metabase.Version{1, 5}, 2,
[]metabase.Version{1, 3}, 2,
4},
{"testbucket", "object", "newbucket", "object",
[]metabase.Version{2, 3}, 1,
[]metabase.Version{1, 5}, 2,
6},
}
for _, tc := range testCases {
metabasetest.DeleteAll{}.Check(ctx, t, db)
db.TestingEnableMultipleVersions(false)
sourceObjStream.BucketName = tc.Bucket
sourceObjStream.ObjectKey = tc.Key
destinationObjStream.BucketName = tc.NewBucket
destinationObjStream.ObjectKey = tc.NewKey
var rawObjects []metabase.RawObject
for _, version := range tc.sourcePendingVersions {
sourceObjStream.Version = version
sourceObjStream.StreamID = testrand.UUID()
metabasetest.CreatePendingObject(ctx, t, db, sourceObjStream, 0)
rawObjects = append(rawObjects, metabase.RawObject{
ObjectStream: sourceObjStream,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
})
}
sourceObjStream.Version = tc.sourceCommittedVersion
sourceObjStream.StreamID = testrand.UUID()
sourceObj, _ := metabasetest.CreateTestObject{
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
ObjectStream: sourceObjStream,
Encryption: metabasetest.DefaultEncryption,
},
CommitObject: &metabase.CommitObject{
ObjectStream: sourceObjStream,
OverrideEncryptedMetadata: true,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, sourceObjStream, 0)
rawObjects = append(rawObjects, metabase.RawObject(sourceObj))
for _, version := range tc.destinationPendingVersions {
destinationObjStream.Version = version
destinationObjStream.StreamID = testrand.UUID()
metabasetest.CreatePendingObject(ctx, t, db, destinationObjStream, 0)
rawObjects = append(rawObjects, metabase.RawObject{
ObjectStream: destinationObjStream,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
})
}
if tc.destinationCommittedVersion != 0 {
destinationObjStream.StreamID = testrand.UUID()
destinationObjStream.Version = tc.destinationCommittedVersion
_, _ = metabasetest.CreateTestObject{
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
ObjectStream: destinationObjStream,
Encryption: metabasetest.DefaultEncryption,
},
CommitObject: &metabase.CommitObject{
ObjectStream: destinationObjStream,
OverrideEncryptedMetadata: true,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, destinationObjStream, 0)
}
db.TestingEnableMultipleVersions(true)
copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
OriginalObject: sourceObj,
CopyObjectStream: &destinationObjStream,
}.Run(ctx, t, db)
require.Equal(t, tc.expectedCopyVersion, copyObj.Version)
rawObjects = append(rawObjects, metabase.RawObject(copyObj))
metabasetest.Verify{
Objects: rawObjects,
Segments: expectedOriginalSegments,
Copies: []metabase.RawCopy{},
}.Check(ctx, t, db)
}
})
})
}

View File

@ -558,3 +558,9 @@ func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time,
}
return impl.AsOfSystemTime(baseline)
}
// TestingEnableMultipleVersions enables or disables the use of multiple versions (for tests).
// Will be removed when multiple versions is enabled in production.
func (db *DB) TestingEnableMultipleVersions(enabled bool) {
db.config.MultipleVersions = enabled
}