diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index c34861000..780f7789c 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -5,10 +5,10 @@ package metabase import ( "context" + "database/sql" + "errors" "time" - "github.com/zeebo/errs" - "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/dbutil/pgutil" @@ -52,6 +52,12 @@ type FinishCopyObject struct { NewSegmentKeys []EncryptedKeyAndNonce + // NewDisallowDelete indicates whether the user is allowed to delete an existing unversioned object. + NewDisallowDelete bool + + // NewVersioned indicates that the object allows multiple versions. + NewVersioned bool + // VerifyLimits holds a callback by which the caller can interrupt the copy // if it turns out completing the copy would exceed a limit. // It will be called only once. @@ -106,15 +112,18 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje var copyMetadata []byte err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - sourceObject, ancestorStreamID, objectAtDestination, nextAvailableVersion, err := getObjectAtCopySourceAndDestination(ctx, tx, opts) + sourceObject, err := getObjectExactVersion(ctx, tx, opts) if err != nil { + if ErrObjectNotFound.Has(err) { + return ErrObjectNotFound.New("source object not found") + } return err } - - if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID { - newObject = sourceObject - return nil + if sourceObject.StreamID != opts.StreamID { + // TODO(versioning): should we report it as "not found" instead? + return ErrObjectNotFound.New("object was changed during copy") } + if opts.VerifyLimits != nil { err := opts.VerifyLimits(sourceObject.TotalEncryptedSize, int64(sourceObject.SegmentCount)) if err != nil { @@ -216,57 +225,66 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje copyMetadata = sourceObject.EncryptedMetadata } - if objectAtDestination != nil { - version := objectAtDestination.Version - deletedObjects, err := db.deleteObjectExactVersion( - ctx, DeleteObjectExactVersion{ - Version: version, - ObjectLocation: ObjectLocation{ - ProjectID: objectAtDestination.ProjectID, - BucketName: objectAtDestination.BucketName, - ObjectKey: objectAtDestination.ObjectKey, - }, - }, tx, - ) + var highestVersion Version + if !opts.NewVersioned { + // TODO(ver): this logic can probably merged into update query + // + // Note, we are prematurely deleting the object without permissions + // and then rolling the action back, if we were not allowed to. + deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{ + ProjectID: opts.ProjectID, + BucketName: opts.NewBucket, + ObjectKey: opts.NewEncryptedObjectKey, + }, tx) if err != nil { - return Error.New("unable to delete existing object at copy destination: %w", err) + return Error.New("unable to delete object at target location: %w", err) + } + if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete { + return ErrPermissionDenied.New("no permissions to delete existing object") } - // The object at the destination was the ancestor! - if ancestorStreamID == objectAtDestination.StreamID { - if len(deletedObjects.Objects) == 0 { - return Error.New("ancestor is gone, please retry operation") - } + highestVersion = deleted.MaxVersion + } else { + highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{ + ProjectID: opts.ProjectID, + BucketName: opts.NewBucket, + ObjectKey: opts.NewEncryptedObjectKey, + }, tx) + if err != nil { + return Error.New("unable to query highest version: %w", err) } } + newStatus := committedWhereVersioned(opts.NewVersioned) + // TODO we need to handle metadata correctly (copy from original object or replace) row := tx.QueryRowContext(ctx, ` INSERT INTO objects ( project_id, bucket_name, object_key, version, stream_id, - expires_at, status, segment_count, + status, expires_at, segment_count, encryption, encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, zombie_deletion_deadline ) VALUES ( $1, $2, $3, $4, $5, - $6,`+statusCommittedUnversioned+`, $7, - $8, - $9, $10, $11, - $12, $13, $14, null + $6, $7, $8, + $9, + $10, $11, $12, + $13, $14, $15, null ) RETURNING created_at`, - opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, nextAvailableVersion, opts.NewStreamID, - sourceObject.ExpiresAt, sourceObject.SegmentCount, + opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, highestVersion+1, opts.NewStreamID, + newStatus, sourceObject.ExpiresAt, sourceObject.SegmentCount, encryptionParameters{&sourceObject.Encryption}, copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey, sourceObject.TotalPlainSize, sourceObject.TotalEncryptedSize, sourceObject.FixedSegmentSize, ) newObject = sourceObject - newObject.Version = nextAvailableVersion + newObject.Version = highestVersion + 1 + newObject.Status = newStatus err = row.Scan(&newObject.CreatedAt) if err != nil { @@ -327,123 +345,49 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje return newObject, nil } -// Fetch the following in a single query: -// - object at copy source location (error if it's not there) -// - next version available -// - object at copy destination location (if any). -func getObjectAtCopySourceAndDestination( - ctx context.Context, tx tagsql.Tx, opts FinishCopyObject, -) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, nextAvailableVersion Version, err error) { +// getObjectExactVersion returns object information for exact version. +func getObjectExactVersion(ctx context.Context, tx tagsql.Tx, opts FinishCopyObject) (_ Object, err error) { defer mon.Task()(&ctx)(&err) - var highestVersion Version + if err := opts.Verify(); err != nil { + return Object{}, err + } - sourceObject.ProjectID = opts.ProjectID - sourceObject.BucketName = opts.BucketName - sourceObject.ObjectKey = opts.ObjectKey - sourceObject.Version = opts.Version - sourceObject.Status = CommittedUnversioned - - // get objects at source and destination (if any) - rows, err := tx.QueryContext(ctx, ` - WITH destination_current_versions AS ( - SELECT status, max(version) AS version - FROM objects - WHERE - project_id = $1 AND - bucket_name = $5 AND - object_key = $6 - GROUP BY status - ) + // TODO(ver): should we allow copying delete markers? + object := Object{} + err = tx.QueryRowContext(ctx, ` SELECT - objects.stream_id, - expires_at, + stream_id, status, + created_at, expires_at, segment_count, - encrypted_metadata, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, - encryption, - 0, - coalesce((SELECT max(version) FROM destination_current_versions),0) AS highest_version + encryption FROM objects WHERE - project_id = $1 AND - bucket_name = $3 AND - object_key = $4 AND - version = $2 AND - status = `+statusCommittedUnversioned+` - UNION ALL - SELECT - stream_id, - expires_at, - segment_count, - NULL, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption, - version, - (SELECT max(version) FROM destination_current_versions) AS highest_version - FROM objects - WHERE - project_id = $1 AND - bucket_name = $5 AND - object_key = $6 AND - version = (SELECT version FROM destination_current_versions - WHERE status = `+statusCommittedUnversioned+`)`, - sourceObject.ProjectID, sourceObject.Version, - []byte(sourceObject.BucketName), sourceObject.ObjectKey, - opts.NewBucket, opts.NewEncryptedObjectKey) - if err != nil { - return Object{}, uuid.UUID{}, nil, 0, err - } - defer func() { - err = errs.Combine(err, rows.Err(), rows.Close()) - }() - - if !rows.Next() { - return Object{}, uuid.UUID{}, nil, 0, ErrObjectNotFound.New("source object not found") - } - - err = rows.Scan( - &sourceObject.StreamID, - &sourceObject.ExpiresAt, - &sourceObject.SegmentCount, - &sourceObject.EncryptedMetadata, - &sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize, - encryptionParameters{&sourceObject.Encryption}, - &highestVersion, - &highestVersion, - ) - if err != nil { - return Object{}, uuid.UUID{}, nil, 0, Error.New("unable to query object status: %w", err) - } - if sourceObject.StreamID != opts.StreamID { - return Object{}, uuid.UUID{}, nil, 0, ErrObjectNotFound.New("object was changed during copy") - } - - if rows.Next() { - destinationObject = &Object{} - destinationObject.ProjectID = opts.ProjectID - destinationObject.BucketName = opts.NewBucket - destinationObject.ObjectKey = opts.NewEncryptedObjectKey - // There is an object at the destination. - // We will delete it before doing the copy - err := rows.Scan( - &destinationObject.StreamID, - &destinationObject.ExpiresAt, - &destinationObject.SegmentCount, - &destinationObject.EncryptedMetadata, - &destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize, - encryptionParameters{&destinationObject.Encryption}, - &destinationObject.Version, - &highestVersion, + (project_id, bucket_name, object_key, version) = ($1, $2, $3, $4) AND + status IN `+statusesCommitted+` AND + (expires_at IS NULL OR expires_at > now())`, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version). + Scan( + &object.StreamID, &object.Status, + &object.CreatedAt, &object.ExpiresAt, + &object.SegmentCount, + &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, + &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, + encryptionParameters{&object.Encryption}, ) - if err != nil { - return Object{}, uuid.UUID{}, nil, 0, Error.New("error while reading existing object at destination: %w", err) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Object{}, ErrObjectNotFound.Wrap(Error.Wrap(err)) } + return Object{}, Error.New("unable to query object status: %w", err) } - if rows.Next() { - return Object{}, uuid.UUID{}, nil, 0, Error.New("expected 1 or 2 rows, got 3 or more") - } + object.ProjectID = opts.ProjectID + object.BucketName = opts.BucketName + object.ObjectKey = opts.ObjectKey + object.Version = opts.Version - return sourceObject, ancestorStreamID, destinationObject, highestVersion + 1, nil + return object, nil } diff --git a/satellite/metabase/copy_object_test.go b/satellite/metabase/copy_object_test.go index 2e52f5c51..500bb7a14 100644 --- a/satellite/metabase/copy_object_test.go +++ b/satellite/metabase/copy_object_test.go @@ -967,16 +967,49 @@ func TestFinishCopyObject(t *testing.T) { }.Run(ctx, t, db, obj, byte(numberOfSegments)) obj.StreamID = testrand.UUID() - _, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{ + expectedCopy, _, expectedCopySegments := metabasetest.CreateObjectCopy{ OriginalObject: originalObj, CopyObjectStream: &obj, }.Run(ctx, t, db) metabasetest.Verify{ Objects: []metabase.RawObject{ - metabase.RawObject(originalObj), + metabase.RawObject(expectedCopy), }, - Segments: expectedOriginalSegments, + Segments: expectedCopySegments, + }.Check(ctx, t, db) + }) + + t.Run("finish copy object versioned to same destination", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + // both should be preserved + + obj := metabasetest.RandObjectStream() + numberOfSegments := 10 + originalObj, _ := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, obj, byte(numberOfSegments)) + + obj.StreamID = testrand.UUID() + expectedCopy, expectedOriginalSegments, expectedCopySegments := metabasetest.CreateObjectCopy{ + OriginalObject: originalObj, + CopyObjectStream: &obj, + + NewVersioned: true, + }.Run(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(originalObj), + metabase.RawObject(expectedCopy), + }, + Segments: append(expectedCopySegments, expectedOriginalSegments...), }.Check(ctx, t, db) }) @@ -1116,5 +1149,184 @@ func TestFinishCopyObject(t *testing.T) { }.Check(ctx, t, db) } }) + + t.Run("existing object is overwritten", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + initialStream := metabasetest.RandObjectStream() + initialObject := metabasetest.CreateObject(ctx, t, db, initialStream, 0) + + conflictObjStream := metabasetest.RandObjectStream() + conflictObjStream.ProjectID = initialStream.ProjectID + metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0) + + newNonce := testrand.Nonce() + newMetadataKey := testrand.Bytes(265) + newUUID := testrand.UUID() + + now := time.Now() + + copiedObject := metabase.Object{ + ObjectStream: metabase.ObjectStream{ + ProjectID: conflictObjStream.ProjectID, + BucketName: conflictObjStream.BucketName, + ObjectKey: conflictObjStream.ObjectKey, + StreamID: newUUID, + Version: conflictObjStream.Version + 1, + }, + CreatedAt: now, + Status: metabase.CommittedUnversioned, + Encryption: initialObject.Encryption, + EncryptedMetadataNonce: newNonce[:], + EncryptedMetadataEncryptedKey: newMetadataKey, + } + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: conflictObjStream.BucketName, + NewStreamID: newUUID, + ObjectStream: initialStream, + NewEncryptedObjectKey: conflictObjStream.ObjectKey, + NewEncryptedMetadataKeyNonce: newNonce, + NewEncryptedMetadataKey: newMetadataKey, + }, + Result: copiedObject, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(initialObject), + metabase.RawObject(copiedObject), + }, + }.Check(ctx, t, db) + }) + + t.Run("existing object is not overwritten, permission denied", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + initialStream := metabasetest.RandObjectStream() + initialObject := metabasetest.CreateObject(ctx, t, db, initialStream, 0) + + conflictObjStream := metabasetest.RandObjectStream() + conflictObjStream.ProjectID = initialStream.ProjectID + conflictObject := metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0) + + newNonce := testrand.Nonce() + newMetadataKey := testrand.Bytes(265) + newUUID := testrand.UUID() + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: conflictObjStream.BucketName, + ObjectStream: initialStream, + NewStreamID: newUUID, + + NewEncryptedObjectKey: conflictObjStream.ObjectKey, + NewEncryptedMetadataKeyNonce: newNonce, + NewEncryptedMetadataKey: newMetadataKey, + + NewDisallowDelete: true, + }, + ErrClass: &metabase.ErrPermissionDenied, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(conflictObject), + metabase.RawObject(initialObject), + }, + }.Check(ctx, t, db) + }) + + t.Run("versioned targets unversioned and versioned", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj := metabasetest.RandObjectStream() + obj.Version = 12000 + unversionedObject := metabasetest.CreateObject(ctx, t, db, obj, 0) + obj.Version = 13000 + versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0) + + sourceStream := metabasetest.RandObjectStream() + sourceStream.ProjectID = obj.ProjectID + sourceObject := metabasetest.CreateObject(ctx, t, db, sourceStream, 0) + + newStreamID := testrand.UUID() + + copiedObject := sourceObject + copiedObject.ObjectStream.ProjectID = obj.ProjectID + copiedObject.ObjectStream.BucketName = obj.BucketName + copiedObject.ObjectStream.ObjectKey = obj.ObjectKey + copiedObject.ObjectStream.Version = 13001 + copiedObject.ObjectStream.StreamID = newStreamID + copiedObject.Status = metabase.CommittedVersioned + + // versioned copy should leave everything else as is + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + ObjectStream: sourceStream, + NewBucket: obj.BucketName, + NewStreamID: newStreamID, + NewEncryptedObjectKey: obj.ObjectKey, + + NewVersioned: true, + }, + Result: copiedObject, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(unversionedObject), + metabase.RawObject(versionedObject), + metabase.RawObject(sourceObject), + metabase.RawObject(copiedObject), + }, + }.Check(ctx, t, db) + }) + + t.Run("unversioned targets unversioned and versioned", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj := metabasetest.RandObjectStream() + obj.Version = 12000 + metabasetest.CreateObject(ctx, t, db, obj, 0) + obj.Version = 13000 + versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0) + + sourceStream := metabasetest.RandObjectStream() + sourceStream.ProjectID = obj.ProjectID + sourceObject := metabasetest.CreateObject(ctx, t, db, sourceStream, 0) + + newStreamID := testrand.UUID() + + copiedObject := sourceObject + copiedObject.ObjectStream.ProjectID = obj.ProjectID + copiedObject.ObjectStream.BucketName = obj.BucketName + copiedObject.ObjectStream.ObjectKey = obj.ObjectKey + copiedObject.ObjectStream.Version = 13001 + copiedObject.ObjectStream.StreamID = newStreamID + copiedObject.Status = metabase.CommittedUnversioned + + // unversioned copy should only delete the unversioned object + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + ObjectStream: sourceStream, + NewBucket: obj.BucketName, + NewStreamID: newStreamID, + NewEncryptedObjectKey: obj.ObjectKey, + NewVersioned: false, + }, + Result: copiedObject, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(versionedObject), + metabase.RawObject(sourceObject), + metabase.RawObject(copiedObject), + }, + }.Check(ctx, t, db) + }) + }) } diff --git a/satellite/metabase/metabasetest/create.go b/satellite/metabase/metabasetest/create.go index 9b930e2f1..c13ce5771 100644 --- a/satellite/metabase/metabasetest/create.go +++ b/satellite/metabase/metabasetest/create.go @@ -412,6 +412,9 @@ type CreateObjectCopy struct { OriginalSegments []metabase.Segment FinishObject *metabase.FinishCopyObject CopyObjectStream *metabase.ObjectStream + + NewDisallowDelete bool + NewVersioned bool } // Run creates the copy. @@ -469,6 +472,9 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab NewEncryptedObjectKey: copyStream.ObjectKey, NewEncryptedMetadataKeyNonce: testrand.Nonce(), NewEncryptedMetadataKey: testrand.Bytes(32), + + NewDisallowDelete: cc.NewDisallowDelete, + NewVersioned: cc.NewVersioned, } } diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index c40da9c72..1009456b3 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -2239,6 +2239,9 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce, NewEncryptedMetadataKey: req.NewEncryptedMetadataKey, + // TODO(ver): currently we always allow deletion, to not change behaviour. + NewDisallowDelete: false, + VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error { return endpoint.addStorageUsageUpToLimit(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments) },