diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index 5255143bd..1d3335e97 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -106,12 +106,14 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje var copyMetadata []byte err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - - sourceObject, ancestorStreamID, objectAtDestination, err := getObjectAtCopySourceAndDestination(ctx, tx, opts) + sourceObject, ancestorStreamID, objectAtDestination, nextAvailableVersion, err := getObjectAtCopySourceAndDestination(ctx, tx, opts) if err != nil { return err } + if !db.config.MultipleVersions { + nextAvailableVersion = opts.Version + } if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID { newObject = sourceObject return nil @@ -210,9 +212,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje } if objectAtDestination != nil { + version := opts.Version + if db.config.MultipleVersions { + version = objectAtDestination.Version + } deletedObjects, err := db.deleteObjectExactVersionServerSideCopy( ctx, DeleteObjectExactVersion{ - Version: opts.Version, + Version: version, ObjectLocation: ObjectLocation{ ProjectID: objectAtDestination.ProjectID, BucketName: objectAtDestination.BucketName, @@ -253,7 +259,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) RETURNING created_at`, - opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID, + opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, nextAvailableVersion, opts.NewStreamID, sourceObject.ExpiresAt, sourceObject.SegmentCount, encryptionParameters{&sourceObject.Encryption}, copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey, @@ -261,6 +267,8 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) newObject = sourceObject + newObject.Version = nextAvailableVersion + err = row.Scan(&newObject.CreatedAt) if err != nil { return Error.New("unable to copy object: %w", err) @@ -331,25 +339,43 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje // Fetch the following in a single query: // - object at copy source location (error if it's not there) // - source ancestor stream id (if any) +// - next version available // - object at copy destination location (if any). func getObjectAtCopySourceAndDestination( ctx context.Context, tx tagsql.Tx, opts FinishCopyObject, -) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, err error) { +) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, nextAvailableVersion Version, err error) { defer mon.Task()(&ctx)(&err) var ancestorStreamIDBytes []byte + var highestVersion Version + + sourceObject.ProjectID = opts.ProjectID + sourceObject.BucketName = opts.BucketName + sourceObject.ObjectKey = opts.ObjectKey + sourceObject.Version = opts.Version + sourceObject.Status = Committed + // get objects at source and destination (if any) rows, err := tx.QueryContext(ctx, ` + WITH destination_current_versions AS ( + SELECT status, max(version) AS version + FROM objects + WHERE + project_id = $1 AND + bucket_name = $5 AND + object_key = $6 + GROUP BY status + ) SELECT objects.stream_id, - bucket_name, - object_key, expires_at, segment_count, encrypted_metadata, total_plain_size, total_encrypted_size, fixed_segment_size, encryption, - segment_copies.ancestor_stream_id + segment_copies.ancestor_stream_id, + 0, + coalesce((SELECT max(version) FROM destination_current_versions),0) AS highest_version FROM objects LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id WHERE @@ -360,67 +386,59 @@ func getObjectAtCopySourceAndDestination( status = `+committedStatus+` UNION ALL SELECT - objects.stream_id, - bucket_name, - object_key, + stream_id, expires_at, segment_count, - encrypted_metadata, + NULL, total_plain_size, total_encrypted_size, fixed_segment_size, encryption, - NULL + NULL, + version, + (SELECT max(version) FROM destination_current_versions) AS highest_version FROM objects WHERE - project_id = $1 AND - bucket_name = $5 AND - object_key = $6 AND - version = $2 AND - status = `+committedStatus, - opts.ProjectID, opts.Version, - []byte(opts.BucketName), opts.ObjectKey, + project_id = $1 AND + bucket_name = $5 AND + object_key = $6 AND + version = (SELECT version FROM destination_current_versions + WHERE status = `+committedStatus+`)`, + sourceObject.ProjectID, sourceObject.Version, + []byte(sourceObject.BucketName), sourceObject.ObjectKey, opts.NewBucket, opts.NewEncryptedObjectKey) if err != nil { - return Object{}, uuid.UUID{}, nil, err + return Object{}, uuid.UUID{}, nil, 0, err } defer func() { - err = errs.Combine(err, rows.Err()) - err = errs.Combine(err, rows.Close()) + err = errs.Combine(err, rows.Err(), rows.Close()) }() if !rows.Next() { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object not found") + return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("source object not found") } err = rows.Scan( &sourceObject.StreamID, - &sourceObject.BucketName, - &sourceObject.ObjectKey, &sourceObject.ExpiresAt, &sourceObject.SegmentCount, &sourceObject.EncryptedMetadata, &sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize, encryptionParameters{&sourceObject.Encryption}, &ancestorStreamIDBytes, + &highestVersion, + &highestVersion, ) if err != nil { - return Object{}, uuid.UUID{}, nil, Error.New("unable to query object status: %w", err) - } - if sourceObject.BucketName != opts.BucketName || sourceObject.ObjectKey != opts.ObjectKey { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object is gone") + return Object{}, uuid.UUID{}, nil, 0, Error.New("unable to query object status: %w", err) } if sourceObject.StreamID != opts.StreamID { - return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("object was changed during copy") + return Object{}, uuid.UUID{}, nil, 0, storj.ErrObjectNotFound.New("object was changed during copy") } - sourceObject.ProjectID = opts.ProjectID - sourceObject.Version = opts.Version - sourceObject.Status = Committed - if len(ancestorStreamIDBytes) != 0 { // Source object already was a copy, the new copy becomes yet another copy of the existing ancestor ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes) if err != nil { - return Object{}, uuid.UUID{}, nil, err + return Object{}, uuid.UUID{}, nil, 0, err } } else { // Source object was not a copy, it will now become an ancestor (unless it has only inline segments) @@ -437,27 +455,23 @@ func getObjectAtCopySourceAndDestination( // We will delete it before doing the copy err := rows.Scan( &destinationObject.StreamID, - &destinationObject.BucketName, - &destinationObject.ObjectKey, &destinationObject.ExpiresAt, &destinationObject.SegmentCount, &destinationObject.EncryptedMetadata, &destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize, encryptionParameters{&destinationObject.Encryption}, &_bogusBytes, + &destinationObject.Version, + &highestVersion, ) if err != nil { - return Object{}, uuid.UUID{}, nil, Error.New("error while reading existing object at destination: %w", err) - } - - if destinationObject.BucketName != opts.NewBucket || destinationObject.ObjectKey != opts.NewEncryptedObjectKey { - return Object{}, uuid.UUID{}, nil, Error.New("unexpected") + return Object{}, uuid.UUID{}, nil, 0, Error.New("error while reading existing object at destination: %w", err) } } if rows.Next() { - return Object{}, uuid.UUID{}, nil, Error.New("expected 1 or 2 rows, got 3 or more") + return Object{}, uuid.UUID{}, nil, 0, Error.New("expected 1 or 2 rows, got 3 or more") } - return sourceObject, ancestorStreamID, destinationObject, nil + return sourceObject, ancestorStreamID, destinationObject, highestVersion + 1, nil } diff --git a/satellite/metabase/copy_object_test.go b/satellite/metabase/copy_object_test.go index 205b3e9a4..3796fd21e 100644 --- a/satellite/metabase/copy_object_test.go +++ b/satellite/metabase/copy_object_test.go @@ -1011,5 +1011,145 @@ func TestFinishCopyObject(t *testing.T) { Copies: []metabase.RawCopy{}, }.Check(ctx, t, db) }) + + t.Run("finish copy object to existing pending destination", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + sourceObjStream := metabasetest.RandObjectStream() + destinationObjStream := metabasetest.RandObjectStream() + destinationObjStream.ProjectID = sourceObjStream.ProjectID + // testcases: + // - versions of pending objects + // - version of committed object + // - expected copy version + + testCases := []struct { + Bucket string + Key metabase.ObjectKey + NewBucket string + NewKey metabase.ObjectKey + sourcePendingVersions []metabase.Version + sourceCommittedVersion metabase.Version + destinationPendingVersions []metabase.Version + destinationCommittedVersion metabase.Version + expectedCopyVersion metabase.Version + }{ + // the same bucket + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 2, + []metabase.Version{}, 1, + 2}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 1, + []metabase.Version{1}, 2, + 3}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{}, 1, + []metabase.Version{1, 3}, 2, + 4}, + {"testbucket", "object", "testbucket", "new-object", + []metabase.Version{1, 5}, 2, + []metabase.Version{1, 3}, 2, + 4}, + {"testbucket", "object", "newbucket", "object", + []metabase.Version{2, 3}, 1, + []metabase.Version{1, 5}, 2, + 6}, + } + + for _, tc := range testCases { + metabasetest.DeleteAll{}.Check(ctx, t, db) + db.TestingEnableMultipleVersions(false) + sourceObjStream.BucketName = tc.Bucket + sourceObjStream.ObjectKey = tc.Key + destinationObjStream.BucketName = tc.NewBucket + destinationObjStream.ObjectKey = tc.NewKey + + var rawObjects []metabase.RawObject + for _, version := range tc.sourcePendingVersions { + sourceObjStream.Version = version + sourceObjStream.StreamID = testrand.UUID() + metabasetest.CreatePendingObject(ctx, t, db, sourceObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject{ + ObjectStream: sourceObjStream, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }) + } + sourceObjStream.Version = tc.sourceCommittedVersion + sourceObjStream.StreamID = testrand.UUID() + sourceObj, _ := metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: sourceObjStream, + Encryption: metabasetest.DefaultEncryption, + }, + CommitObject: &metabase.CommitObject{ + ObjectStream: sourceObjStream, + OverrideEncryptedMetadata: true, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, sourceObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject(sourceObj)) + + for _, version := range tc.destinationPendingVersions { + destinationObjStream.Version = version + destinationObjStream.StreamID = testrand.UUID() + metabasetest.CreatePendingObject(ctx, t, db, destinationObjStream, 0) + + rawObjects = append(rawObjects, metabase.RawObject{ + ObjectStream: destinationObjStream, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }) + } + + if tc.destinationCommittedVersion != 0 { + destinationObjStream.StreamID = testrand.UUID() + destinationObjStream.Version = tc.destinationCommittedVersion + _, _ = metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: destinationObjStream, + Encryption: metabasetest.DefaultEncryption, + }, + CommitObject: &metabase.CommitObject{ + ObjectStream: destinationObjStream, + OverrideEncryptedMetadata: true, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, destinationObjStream, 0) + } + + db.TestingEnableMultipleVersions(true) + copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{ + OriginalObject: sourceObj, + CopyObjectStream: &destinationObjStream, + }.Run(ctx, t, db) + + require.Equal(t, tc.expectedCopyVersion, copyObj.Version) + + rawObjects = append(rawObjects, metabase.RawObject(copyObj)) + + metabasetest.Verify{ + Objects: rawObjects, + Segments: expectedOriginalSegments, + Copies: []metabase.RawCopy{}, + }.Check(ctx, t, db) + } + }) }) } diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index 38308e428..30e01c334 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -558,3 +558,9 @@ func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time, } return impl.AsOfSystemTime(baseline) } + +// TestingEnableMultipleVersions enables or disables the use of multiple versions (for tests). +// Will be removed when multiple versions is enabled in production. +func (db *DB) TestingEnableMultipleVersions(enabled bool) { + db.config.MultipleVersions = enabled +}