From f58129786ba249f880dadeea6fae6ecfc0f43066 Mon Sep 17 00:00:00 2001 From: Fadila Khadar Date: Thu, 15 Sep 2022 22:32:56 +0200 Subject: [PATCH] satellite/metabase: copy handles version when overwriting target location When doing server-side copy, deletes the committed version of the target location if it already exists. It does not touch pending versions. The version of the copy is set to the highest already existing version + 1. Fixes: https://github.com/storj/storj/issues/5071 Change-Id: I1d91ac17054834b1f4f0970a9fa5d58198c58a37 --- satellite/metabase/copy_object.go | 104 ++++++++++-------- satellite/metabase/copy_object_test.go | 140 +++++++++++++++++++++++++ satellite/metabase/db.go | 6 ++ 3 files changed, 205 insertions(+), 45 deletions(-) diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index 5255143bd..1d3335e97 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -106,12 +106,14 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje var copyMetadata []byte err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - - sourceObject, ancestorStreamID, objectAtDestination, err := getObjectAtCopySourceAndDestination(ctx, tx, opts) + sourceObject, ancestorStreamID, objectAtDestination, nextAvailableVersion, err := getObjectAtCopySourceAndDestination(ctx, tx, opts) if err != nil { return err } + if !db.config.MultipleVersions { + nextAvailableVersion = opts.Version + } if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID { newObject = sourceObject return nil @@ -210,9 +212,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje } if objectAtDestination != nil { + version := opts.Version + if db.config.MultipleVersions { + version = objectAtDestination.Version + } deletedObjects, err := db.deleteObjectExactVersionServerSideCopy( ctx, DeleteObjectExactVersion{ - Version: opts.Version, + Version: version, ObjectLocation: ObjectLocation{ ProjectID: objectAtDestination.ProjectID, BucketName: objectAtDestination.BucketName, @@ -253,7 +259,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) RETURNING created_at`, - opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID, + opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, nextAvailableVersion, opts.NewStreamID, sourceObject.ExpiresAt, sourceObject.SegmentCount, encryptionParameters{&sourceObject.Encryption}, copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey, @@ -261,6 +267,8 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) newObject = sourceObject + newObject.Version = nextAvailableVersion + err = row.Scan(&newObject.CreatedAt) if err != nil { return Error.New("unable to copy object: %w", err) @@ -331,25 +339,43 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje // Fetch the following in a single query: // - object at copy source location (error if it's not there) // - source ancestor stream id (if any) +// - next version available // - object at copy destination location (if any). func getObjectAtCopySourceAndDestination( ctx context.Context, tx tagsql.Tx, opts FinishCopyObject, -) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, err error) { +) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, nextAvailableVersion Version, err error) { defer mon.Task()(&ctx)(&err) var ancestorStreamIDBytes []byte + var highestVersion Version + + sourceObject.ProjectID = opts.ProjectID + sourceObject.BucketName = opts.BucketName + sourceObject.ObjectKey = opts.ObjectKey + sourceObject.Version = opts.Version + sourceObject.Status = Committed + // get objects at source and destination (if any) rows, err := tx.QueryContext(ctx, ` + WITH destination_current_versions AS ( + SELECT status, max(version) AS version + FROM objects + WHERE + project_id = $1 AND + bucket_name = $5 AND + object_key = $6 + GROUP BY status + ) SELECT objects.stream_id, - bucket_name, - object_key, expires_at, segment_count, encrypted_metadata, total_plain_size, total_encrypted_size, fixed_segment_size, encryption, - segment_copies.ancestor_stream_id + segment_copies.ancestor_stream_id, + 0, + coalesce((SELECT max(version) FROM destination_current_versions),0) AS highest_version FROM objects LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id WHERE @@ -360,67 +386,59 @@ func getObjectAtCopySourceAndDestination( status = `+committedStatus+` UNION ALL SELECT - objects.stream_id, - bucket_name, - object_key, + stream_id, expires_at, segment_count, - encrypted_metadata, + NULL, total_plain_size, total_encrypted_size, fixed_segment_size, encryption, - NULL + NULL, + version, + (SELECT max(version) FROM destination_current_versions) AS highest_version FROM objects WHERE - project_id = $1 AND - bucket_name = $5 AND - object_key = $6 AND - version = $2 AND - status = `+committedStatus, - opts.ProjectID, opts.Version, - []byte(opts.BucketName), opts.ObjectKey, + project_id = $1 AND + bucket_name = $5 AND + object_key = $6 AND + version = (SELECT version FROM destination_current_versions + WHERE status = `+committedStatus+`)`, + sourceObject.ProjectID, sourceObject.Version, + []byte(sourceObject.BucketName), sourceObject.ObjectKey, opts.NewBucket, opts.NewEncryptedObjectKey) if err != nil { - return Object{}, uuid.UUID{}, nil, err + return Object{}, uuid.UUID{}, nil, 0, err } defer func() { - err = errs.Combine(err, rows.Err()) - err = errs.Combine(err, rows.Close()) + err = errs.Combine(err, rows.Err(), rows.Close()) }() if !rows.Next() { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object not found") + return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("source object not found") } err = rows.Scan( &sourceObject.StreamID, - &sourceObject.BucketName, - &sourceObject.ObjectKey, &sourceObject.ExpiresAt, &sourceObject.SegmentCount, &sourceObject.EncryptedMetadata, &sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize, encryptionParameters{&sourceObject.Encryption}, &ancestorStreamIDBytes, + &highestVersion, + &highestVersion, ) if err != nil { - return Object{}, uuid.UUID{}, nil, Error.New("unable to query object status: %w", err) - } - if sourceObject.BucketName != opts.BucketName || sourceObject.ObjectKey != opts.ObjectKey { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object is gone") + return Object{}, uuid.UUID{}, nil, 0, Error.New("unable to query object status: %w", err) } if sourceObject.StreamID != opts.StreamID { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("object was changed during copy") + return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("object was changed during copy") } - sourceObject.ProjectID = opts.ProjectID - sourceObject.Version = opts.Version - sourceObject.Status = Committed - if len(ancestorStreamIDBytes) != 0 { // Source object already was a copy, the new copy becomes yet another copy of the existing ancestor ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes) if err != nil { - return Object{}, uuid.UUID{}, nil, err + return Object{}, uuid.UUID{}, nil, 0, err } } else { // Source object was not a copy, it will now become an ancestor (unless it has only inline segments) @@ -437,27 +455,23 @@ func getObjectAtCopySourceAndDestination( // We will delete it before doing the copy err := rows.Scan( &destinationObject.StreamID, - &destinationObject.BucketName, - &destinationObject.ObjectKey, &destinationObject.ExpiresAt, &destinationObject.SegmentCount, &destinationObject.EncryptedMetadata, &destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize, encryptionParameters{&destinationObject.Encryption}, &_bogusBytes, + &destinationObject.Version, + &highestVersion, ) if err != nil { - return Object{}, uuid.UUID{}, nil, Error.New("error while reading existing object at destination: %w", err) - } - - if destinationObject.BucketName != opts.NewBucket || destinationObject.ObjectKey != opts.NewEncryptedObjectKey { - return Object{}, uuid.UUID{}, nil, Error.New("unexpected") + return Object{}, uuid.UUID{}, nil, 0, Error.New("error while reading existing object at destination: %w", err) } } if rows.Next() { - return Object{}, uuid.UUID{}, nil, Error.New("expected 1 or 2 rows, got 3 or more") + return Object{}, uuid.UUID{}, nil, 0, Error.New("expected 1 or 2 rows, got 3 or more") } - return sourceObject, ancestorStreamID, destinationObject, nil + return sourceObject, ancestorStreamID, destinationObject, highestVersion + 1, nil } diff --git a/satellite/metabase/copy_object_test.go b/satellite/metabase/copy_object_test.go index 205b3e9a4..3796fd21e 100644 --- a/satellite/metabase/copy_object_test.go +++ b/satellite/metabase/copy_object_test.go @@ -1011,5 +1011,145 @@ func TestFinishCopyObject(t *testing.T) { Copies: []metabase.RawCopy{}, }.Check(ctx, t, db) }) + + t.Run("finish copy object to existing pending destination", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + sourceObjStream := metabasetest.RandObjectStream() + destinationObjStream := metabasetest.RandObjectStream() + destinationObjStream.ProjectID = sourceObjStream.ProjectID + // testcases: + // - versions of pending objects + // - version of committed object + // - expected copy version + + testCases := []struct { + Bucket string + Key metabase.ObjectKey + NewBucket string + NewKey metabase.ObjectKey + sourcePendingVersions []metabase.Version + sourceCommittedVersion metabase.Version + destinationPendingVersions []metabase.Version + destinationCommittedVersion metabase.Version + expectedCopyVersion metabase.Version + }{ + // the same bucket + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 2, + []metabase.Version{}, 1, + 2}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 1, + []metabase.Version{1}, 2, + 3}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 1, + []metabase.Version{1, 3}, 2, + 4}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{1, 5}, 2, + []metabase.Version{1, 3}, 2, + 4}, + {"testbucket", "object", "newbucket", "object", + []metabase.Version{2, 3}, 1, + []metabase.Version{1, 5}, 2, + 6}, + } + + for _, tc := range testCases { + metabasetest.DeleteAll{}.Check(ctx, t, db) + db.TestingEnableMultipleVersions(false) + sourceObjStream.BucketName = tc.Bucket + sourceObjStream.ObjectKey = tc.Key + destinationObjStream.BucketName = tc.NewBucket + destinationObjStream.ObjectKey = tc.NewKey + + var rawObjects []metabase.RawObject + for _, version := range tc.sourcePendingVersions { + sourceObjStream.Version = version + sourceObjStream.StreamID = testrand.UUID() + metabasetest.CreatePendingObject(ctx, t, db, sourceObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject{ + ObjectStream: sourceObjStream, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }) + } + sourceObjStream.Version = tc.sourceCommittedVersion + sourceObjStream.StreamID = testrand.UUID() + sourceObj, _ := metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: sourceObjStream, + Encryption: metabasetest.DefaultEncryption, + }, + CommitObject: &metabase.CommitObject{ + ObjectStream: sourceObjStream, + OverrideEncryptedMetadata: true, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, sourceObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject(sourceObj)) + + for _, version := range tc.destinationPendingVersions { + destinationObjStream.Version = version + destinationObjStream.StreamID = testrand.UUID() + metabasetest.CreatePendingObject(ctx, t, db, destinationObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject{ + ObjectStream: destinationObjStream, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }) + } + + if tc.destinationCommittedVersion != 0 { + destinationObjStream.StreamID = testrand.UUID() + destinationObjStream.Version = tc.destinationCommittedVersion + _, _ = metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: destinationObjStream, + Encryption: metabasetest.DefaultEncryption, + }, + CommitObject: &metabase.CommitObject{ + ObjectStream: destinationObjStream, + OverrideEncryptedMetadata: true, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, destinationObjStream, 0) + } + + db.TestingEnableMultipleVersions(true) + copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{ + OriginalObject: sourceObj, + CopyObjectStream: &destinationObjStream, + }.Run(ctx, t, db) + + require.Equal(t, tc.expectedCopyVersion, copyObj.Version) + + rawObjects = append(rawObjects, metabase.RawObject(copyObj)) + + metabasetest.Verify{ + Objects: rawObjects, + Segments: expectedOriginalSegments, + Copies: []metabase.RawCopy{}, + }.Check(ctx, t, db) + } + }) }) } diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index 38308e428..30e01c334 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -558,3 +558,9 @@ func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time, } return impl.AsOfSystemTime(baseline) } + +// TestingEnableMultipleVersions enables or disables the use of multiple versions (for tests). +// Will be removed when multiple versions is enabled in production. +func (db *DB) TestingEnableMultipleVersions(enabled bool) { + db.config.MultipleVersions = enabled +}