satellite/metabase: fix copy to ancestor location
Previously copying an object to it's ancestor location (copy of copy) broke the object and all copies. This fixes this by calling the existing delete method rather than a custom one when there is an existing object at the copy destination. The check for existing object at destination has been moved to an earlier point in FinishCopy. metabase.DeleteObject exposes a transaction parameter so that it can be reused within metabase. Closes https://github.com/storj/storj/issues/4707 Uplink test at https://review.dev.storj.io/c/storj/uplink/+/7557 Change-Id: I418fc3337fa9f30146ccc1db456af168ae41c326
This commit is contained in:
parent
3a63939fc9
commit
74f4f6e765
@ -9,6 +9,8 @@ import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
@ -153,7 +155,7 @@ func (finishCopy FinishCopyObject) Verify() error {
|
||||
}
|
||||
|
||||
// FinishCopyObject accepts new encryption keys for copied object and insert the corresponding new object ObjectKey and segments EncryptedKey.
|
||||
// TODO handle the case when the source and destination encrypted object keys are the same.
|
||||
// It returns the object at the destination location.
|
||||
func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (object Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -161,52 +163,18 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
originalObject := Object{}
|
||||
copyObject := Object{}
|
||||
newObject := Object{}
|
||||
var copyMetadata []byte
|
||||
|
||||
var ancestorStreamIDBytes []byte
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
objects.stream_id,
|
||||
expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption,
|
||||
segment_copies.ancestor_stream_id
|
||||
FROM objects
|
||||
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4 AND
|
||||
status = `+committedStatus,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
|
||||
Scan(
|
||||
&originalObject.StreamID,
|
||||
&originalObject.ExpiresAt,
|
||||
&originalObject.SegmentCount,
|
||||
&originalObject.EncryptedMetadata,
|
||||
&originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize,
|
||||
encryptionParameters{&originalObject.Encryption},
|
||||
&ancestorStreamIDBytes,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
|
||||
}
|
||||
return Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
originalObject.BucketName = opts.BucketName
|
||||
originalObject.ProjectID = opts.ProjectID
|
||||
originalObject.Version = opts.Version
|
||||
originalObject.Status = Committed
|
||||
|
||||
if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
|
||||
return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
|
||||
sourceObject, ancestorStreamID, objectAtDestination, err := getObjectAtCopySourceAndDestination(ctx, tx, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if int(sourceObject.SegmentCount) != len(opts.NewSegmentKeys) {
|
||||
return ErrInvalidRequest.New("wrong number of segments keys received (received %d, need %d)", len(opts.NewSegmentKeys), sourceObject.SegmentCount)
|
||||
}
|
||||
|
||||
var newSegments struct {
|
||||
@ -221,20 +189,18 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
newSegments.Positions = append(newSegments.Positions, int64(u.Position.Encode()))
|
||||
}
|
||||
|
||||
positions := make([]int64, originalObject.SegmentCount)
|
||||
positions := make([]int64, sourceObject.SegmentCount)
|
||||
|
||||
rootPieceIDs := make([][]byte, originalObject.SegmentCount)
|
||||
rootPieceIDs := make([][]byte, sourceObject.SegmentCount)
|
||||
|
||||
expiresAts := make([]*time.Time, originalObject.SegmentCount)
|
||||
encryptedSizes := make([]int32, originalObject.SegmentCount)
|
||||
plainSizes := make([]int32, originalObject.SegmentCount)
|
||||
plainOffsets := make([]int64, originalObject.SegmentCount)
|
||||
inlineDatas := make([][]byte, originalObject.SegmentCount)
|
||||
expiresAts := make([]*time.Time, sourceObject.SegmentCount)
|
||||
encryptedSizes := make([]int32, sourceObject.SegmentCount)
|
||||
plainSizes := make([]int32, sourceObject.SegmentCount)
|
||||
plainOffsets := make([]int64, sourceObject.SegmentCount)
|
||||
inlineDatas := make([][]byte, sourceObject.SegmentCount)
|
||||
|
||||
redundancySchemes := make([]int64, originalObject.SegmentCount)
|
||||
// TODO: there are probably columns that we can skip
|
||||
// maybe it's possible to have the select and the insert in one query
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
redundancySchemes := make([]int64, sourceObject.SegmentCount)
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position,
|
||||
expires_at,
|
||||
@ -246,7 +212,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
WHERE stream_id = $1
|
||||
ORDER BY position ASC
|
||||
LIMIT $2
|
||||
`, originalObject.StreamID, originalObject.SegmentCount))(func(rows tagsql.Rows) error {
|
||||
`, sourceObject.StreamID, sourceObject.SegmentCount))(func(rows tagsql.Rows) error {
|
||||
index := 0
|
||||
for rows.Next() {
|
||||
err := rows.Scan(
|
||||
@ -267,7 +233,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return err
|
||||
}
|
||||
|
||||
if index != int(originalObject.SegmentCount) {
|
||||
if index != int(sourceObject.SegmentCount) {
|
||||
return Error.New("could not load all of the segment information")
|
||||
}
|
||||
|
||||
@ -287,28 +253,40 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
}
|
||||
}
|
||||
|
||||
copyMetadata = originalObject.EncryptedMetadata
|
||||
if opts.OverrideMetadata {
|
||||
copyMetadata = opts.NewEncryptedMetadata
|
||||
} else {
|
||||
copyMetadata = sourceObject.EncryptedMetadata
|
||||
}
|
||||
|
||||
copyObject = originalObject
|
||||
if objectAtDestination != nil {
|
||||
deletedObjects, err := db.deleteObjectExactVersionServerSideCopy(
|
||||
ctx, DeleteObjectExactVersion{
|
||||
Version: opts.Version,
|
||||
ObjectLocation: ObjectLocation{
|
||||
ProjectID: objectAtDestination.ProjectID,
|
||||
BucketName: objectAtDestination.BucketName,
|
||||
ObjectKey: objectAtDestination.ObjectKey,
|
||||
},
|
||||
}, tx,
|
||||
)
|
||||
if err != nil {
|
||||
return Error.New("unable to delete existing object at copy destination: %w", err)
|
||||
}
|
||||
|
||||
// The object at the destination was the ancestor!
|
||||
// Now that the ancestor of the source object is removed, we need to change the target ancestor.
|
||||
if ancestorStreamID == objectAtDestination.StreamID {
|
||||
if len(deletedObjects) == 0 {
|
||||
return Error.New("ancestor is gone, please retry operation")
|
||||
}
|
||||
|
||||
ancestorStreamID = *deletedObjects[0].PromotedAncestor
|
||||
}
|
||||
}
|
||||
|
||||
// TODO we need to handle metadata correctly (copy from original object or replace)
|
||||
row := tx.QueryRowContext(ctx, `
|
||||
WITH existing_object AS (
|
||||
SELECT
|
||||
objects.stream_id,
|
||||
copies.stream_id AS new_ancestor,
|
||||
objects.segment_count
|
||||
FROM objects
|
||||
LEFT OUTER JOIN segment_copies copies ON objects.stream_id = copies.ancestor_stream_id
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4
|
||||
)
|
||||
INSERT INTO objects (
|
||||
project_id, bucket_name, object_key, version, stream_id,
|
||||
expires_at, status, segment_count,
|
||||
@ -322,43 +300,18 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
$8,
|
||||
$9, $10, $11,
|
||||
$12, $13, $14, null
|
||||
) ON CONFLICT (project_id, bucket_name, object_key, version)
|
||||
DO UPDATE SET
|
||||
stream_id = $5,
|
||||
created_at = now(),
|
||||
expires_at = $6,
|
||||
status = `+committedStatus+`,
|
||||
segment_count = $7,
|
||||
encryption = $8,
|
||||
encrypted_metadata = $9,
|
||||
encrypted_metadata_nonce = $10,
|
||||
encrypted_metadata_encrypted_key = $11,
|
||||
total_plain_size = $12,
|
||||
total_encrypted_size = $13,
|
||||
fixed_segment_size = $14,
|
||||
zombie_deletion_deadline = NULL
|
||||
)
|
||||
RETURNING
|
||||
created_at,
|
||||
(SELECT stream_id FROM existing_object LIMIT 1),
|
||||
(SELECT new_ancestor FROM existing_object LIMIT 1),
|
||||
(SELECT segment_count FROM existing_object LIMIT 1)`,
|
||||
created_at`,
|
||||
opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
|
||||
originalObject.ExpiresAt, originalObject.SegmentCount,
|
||||
encryptionParameters{&originalObject.Encryption},
|
||||
sourceObject.ExpiresAt, sourceObject.SegmentCount,
|
||||
encryptionParameters{&sourceObject.Encryption},
|
||||
copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
|
||||
originalObject.TotalPlainSize, originalObject.TotalEncryptedSize, originalObject.FixedSegmentSize,
|
||||
sourceObject.TotalPlainSize, sourceObject.TotalEncryptedSize, sourceObject.FixedSegmentSize,
|
||||
)
|
||||
|
||||
var existingObjStreamID *uuid.UUID
|
||||
var newAncestorStreamID *uuid.UUID
|
||||
var oldSegmentCount *int
|
||||
|
||||
err = row.Scan(©Object.CreatedAt, &existingObjStreamID, &newAncestorStreamID, &oldSegmentCount)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
|
||||
err = db.deleteExistingObjectSegments(ctx, tx, existingObjStreamID, newAncestorStreamID, oldSegmentCount)
|
||||
newObject = sourceObject
|
||||
err = row.Scan(&newObject.CreatedAt)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
@ -392,15 +345,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
if onlyInlineSegments {
|
||||
return nil
|
||||
}
|
||||
var ancestorStreamID uuid.UUID
|
||||
if len(ancestorStreamIDBytes) != 0 {
|
||||
ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
ancestorStreamID = originalObject.StreamID
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO segment_copies (
|
||||
stream_id, ancestor_stream_id
|
||||
@ -411,6 +356,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -418,69 +364,147 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
copyObject.StreamID = opts.NewStreamID
|
||||
copyObject.BucketName = opts.NewBucket
|
||||
copyObject.ObjectKey = opts.NewEncryptedObjectKey
|
||||
copyObject.EncryptedMetadata = copyMetadata
|
||||
copyObject.EncryptedMetadataEncryptedKey = opts.NewEncryptedMetadataKey
|
||||
newObject.StreamID = opts.NewStreamID
|
||||
newObject.BucketName = opts.NewBucket
|
||||
newObject.ObjectKey = opts.NewEncryptedObjectKey
|
||||
newObject.EncryptedMetadata = copyMetadata
|
||||
newObject.EncryptedMetadataEncryptedKey = opts.NewEncryptedMetadataKey
|
||||
if !opts.NewEncryptedMetadataKeyNonce.IsZero() {
|
||||
copyObject.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce[:]
|
||||
newObject.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce[:]
|
||||
}
|
||||
|
||||
mon.Meter("finish_copy_object").Mark(1)
|
||||
|
||||
return copyObject, nil
|
||||
return newObject, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteExistingObjectSegments(ctx context.Context, tx tagsql.Tx, existingObjStreamID *uuid.UUID, newAncestorStreamID *uuid.UUID, segmentCount *int) (err error) {
|
||||
if existingObjStreamID != nil && *segmentCount > 0 {
|
||||
if newAncestorStreamID == nil {
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
DELETE FROM segments WHERE stream_id = $1
|
||||
`, existingObjStreamID,
|
||||
)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy segments: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var infos deletedObjectInfo
|
||||
// Fetch the following in a single query:
|
||||
// - object at copy source location (error if it's not there)
|
||||
// - source ancestor stream id (if any)
|
||||
// - object at copy destination location (if any).
|
||||
func getObjectAtCopySourceAndDestination(
|
||||
ctx context.Context, tx tagsql.Tx, opts FinishCopyObject,
|
||||
) (sourceObject Object, ancestorStreamID uuid.UUID, destinationObject *Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
infos.SegmentCount = int32(*segmentCount)
|
||||
infos.PromotedAncestor = newAncestorStreamID
|
||||
infos.Segments = make([]deletedRemoteSegmentInfo, *segmentCount)
|
||||
var ancestorStreamIDBytes []byte
|
||||
// get objects at source and destination (if any)
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
SELECT
|
||||
objects.stream_id,
|
||||
bucket_name,
|
||||
object_key,
|
||||
expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption,
|
||||
segment_copies.ancestor_stream_id
|
||||
FROM objects
|
||||
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $3 AND
|
||||
object_key = $4 AND
|
||||
version = $2 AND
|
||||
status = `+committedStatus+`
|
||||
UNION ALL
|
||||
SELECT
|
||||
objects.stream_id,
|
||||
bucket_name,
|
||||
object_key,
|
||||
expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption,
|
||||
NULL
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $5 AND
|
||||
object_key = $6 AND
|
||||
version = $2 AND
|
||||
status = `+committedStatus,
|
||||
opts.ProjectID, opts.Version,
|
||||
[]byte(opts.BucketName), opts.ObjectKey,
|
||||
opts.NewBucket, opts.NewEncryptedObjectKey)
|
||||
if err != nil {
|
||||
return Object{}, uuid.UUID{}, nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, rows.Err())
|
||||
err = errs.Combine(err, rows.Close())
|
||||
}()
|
||||
|
||||
var aliasPieces AliasPieces
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
DELETE FROM segments WHERE stream_id = $1
|
||||
RETURNING position, remote_alias_pieces, repaired_at
|
||||
`, existingObjStreamID))(func(rows tagsql.Rows) error {
|
||||
index := 0
|
||||
for rows.Next() {
|
||||
err = rows.Scan(
|
||||
&infos.Segments[index].Position,
|
||||
&aliasPieces,
|
||||
&infos.Segments[index].RepairedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
infos.Segments[index].Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
index++
|
||||
}
|
||||
return rows.Err()
|
||||
})
|
||||
if !rows.Next() {
|
||||
return Object{}, uuid.UUID{}, nil, storj.ErrObjectNotFound.New("source object not found")
|
||||
}
|
||||
|
||||
err = rows.Scan(
|
||||
&sourceObject.StreamID,
|
||||
&sourceObject.BucketName,
|
||||
&sourceObject.ObjectKey,
|
||||
&sourceObject.ExpiresAt,
|
||||
&sourceObject.SegmentCount,
|
||||
&sourceObject.EncryptedMetadata,
|
||||
&sourceObject.TotalPlainSize, &sourceObject.TotalEncryptedSize, &sourceObject.FixedSegmentSize,
|
||||
encryptionParameters{&sourceObject.Encryption},
|
||||
&ancestorStreamIDBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return Object{}, uuid.UUID{}, nil, Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
if sourceObject.BucketName != opts.BucketName || sourceObject.ObjectKey != opts.ObjectKey {
|
||||
return Object{}, uuid.UUID{}, nil, Error.New("source object is gone")
|
||||
}
|
||||
|
||||
sourceObject.ProjectID = opts.ProjectID
|
||||
sourceObject.Version = opts.Version
|
||||
sourceObject.Status = Committed
|
||||
|
||||
if len(ancestorStreamIDBytes) != 0 {
|
||||
// Source object already was a copy, the new copy becomes yet another copy of the existing ancestor
|
||||
ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy segments: %w", err)
|
||||
return Object{}, uuid.UUID{}, nil, err
|
||||
}
|
||||
} else {
|
||||
// Source object was not a copy, it will now become an ancestor (unless it has only inline segments)
|
||||
ancestorStreamID = sourceObject.StreamID
|
||||
}
|
||||
|
||||
if rows.Next() {
|
||||
var _bogusBytes []byte
|
||||
destinationObject = &Object{}
|
||||
destinationObject.ProjectID = opts.ProjectID
|
||||
destinationObject.BucketName = opts.NewBucket
|
||||
destinationObject.ObjectKey = opts.NewEncryptedObjectKey
|
||||
// There is an object at the destination.
|
||||
// We will delete it before doing the copy
|
||||
err := rows.Scan(
|
||||
&destinationObject.StreamID,
|
||||
&destinationObject.BucketName,
|
||||
&destinationObject.ObjectKey,
|
||||
&destinationObject.ExpiresAt,
|
||||
&destinationObject.SegmentCount,
|
||||
&destinationObject.EncryptedMetadata,
|
||||
&destinationObject.TotalPlainSize, &destinationObject.TotalEncryptedSize, &destinationObject.FixedSegmentSize,
|
||||
encryptionParameters{&destinationObject.Encryption},
|
||||
&_bogusBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return Object{}, uuid.UUID{}, nil, Error.New("error while reading existing object at destination: %w", err)
|
||||
}
|
||||
|
||||
err = db.promoteNewAncestors(ctx, tx, []deletedObjectInfo{infos})
|
||||
if err != nil {
|
||||
return Error.New("unable to copy segments: %w", err)
|
||||
if destinationObject.BucketName != opts.NewBucket || destinationObject.ObjectKey != opts.NewEncryptedObjectKey {
|
||||
return Object{}, uuid.UUID{}, nil, Error.New("unexpected")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
if rows.Next() {
|
||||
return Object{}, uuid.UUID{}, nil, Error.New("expected 1 or 2 rows, got 3 or more")
|
||||
}
|
||||
|
||||
return sourceObject, ancestorStreamID, destinationObject, nil
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
},
|
||||
// validation pass without EncryptedMetadataKey and EncryptedMetadataKeyNonce
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
ErrText: "source object not found",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
@ -311,7 +311,7 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
NewEncryptedMetadataKey: newEncryptedMetadataKey,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
ErrText: "source object not found",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
@ -361,7 +361,7 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
NewEncryptedMetadataKey: newEncryptedMetadataKey,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "wrong amount of segments keys received (received 10, need 9)",
|
||||
ErrText: "wrong number of segments keys received (received 9, need 10)",
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
@ -726,6 +726,159 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
// checks that a copy can be copied to it's ancestor location
|
||||
t.Run("Copy child to ancestor", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
originalObjStream := metabasetest.RandObjectStream()
|
||||
copyObjStream := metabasetest.RandObjectStream()
|
||||
// Copy back to original object key.
|
||||
// StreamID is independent of key.
|
||||
copyBackObjStream := originalObjStream
|
||||
copyBackObjStream.StreamID = testrand.UUID()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream, 4)
|
||||
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjStream,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
// Copy the copy back to the source location
|
||||
opts := metabase.FinishCopyObject{
|
||||
// source
|
||||
ObjectStream: copyObj.ObjectStream,
|
||||
// destination
|
||||
NewBucket: originalObj.BucketName,
|
||||
NewEncryptedObjectKey: originalObj.ObjectKey,
|
||||
NewStreamID: copyBackObjStream.StreamID,
|
||||
OverrideMetadata: false,
|
||||
NewSegmentKeys: []metabase.EncryptedKeyAndNonce{
|
||||
metabasetest.RandEncryptedKeyAndNonce(0),
|
||||
metabasetest.RandEncryptedKeyAndNonce(1),
|
||||
metabasetest.RandEncryptedKeyAndNonce(2),
|
||||
metabasetest.RandEncryptedKeyAndNonce(3),
|
||||
},
|
||||
}
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: copyObj,
|
||||
CopyObjectStream: ©BackObjStream,
|
||||
FinishObject: &opts,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
// expected object at the location which was previously the original object
|
||||
copyBackObj := originalObj
|
||||
copyBackObj.StreamID = opts.NewStreamID
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
copySegments[i].Pieces = originalSegments[i].Pieces
|
||||
copySegments[i].InlineData = originalSegments[i].InlineData
|
||||
copySegments[i].EncryptedETag = nil // TODO: ETag seems lost after copy
|
||||
|
||||
originalSegments[i].StreamID = opts.NewStreamID
|
||||
originalSegments[i].Pieces = nil
|
||||
originalSegments[i].InlineData = nil
|
||||
originalSegments[i].EncryptedKey = opts.NewSegmentKeys[i].EncryptedKey
|
||||
originalSegments[i].EncryptedKeyNonce = opts.NewSegmentKeys[i].EncryptedKeyNonce
|
||||
originalSegments[i].EncryptedETag = nil // TODO: ETag seems lost after copy
|
||||
}
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(copyObj),
|
||||
metabase.RawObject(copyBackObj),
|
||||
},
|
||||
Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments...),
|
||||
Copies: []metabase.RawCopy{{
|
||||
StreamID: opts.NewStreamID,
|
||||
AncestorStreamID: copyObjStream.StreamID,
|
||||
}},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
// checks that a copy ancestor can be copied to itself
|
||||
t.Run("Copy ancestor to itself", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
originalObjStream := metabasetest.RandObjectStream()
|
||||
copyObjStream := metabasetest.RandObjectStream()
|
||||
// Copy back to same object key.
|
||||
// StreamID is independent of key.
|
||||
copyBackObjStream := originalObjStream
|
||||
copyBackObjStream.StreamID = testrand.UUID()
|
||||
|
||||
originalObj, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream, 4)
|
||||
|
||||
copyObj, originalSegments, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjStream,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
opts := metabase.FinishCopyObject{
|
||||
// source
|
||||
ObjectStream: copyObj.ObjectStream,
|
||||
// destination
|
||||
NewBucket: originalObj.BucketName,
|
||||
NewEncryptedObjectKey: originalObj.ObjectKey,
|
||||
NewStreamID: copyBackObjStream.StreamID,
|
||||
OverrideMetadata: false,
|
||||
NewSegmentKeys: []metabase.EncryptedKeyAndNonce{
|
||||
metabasetest.RandEncryptedKeyAndNonce(0),
|
||||
metabasetest.RandEncryptedKeyAndNonce(1),
|
||||
metabasetest.RandEncryptedKeyAndNonce(2),
|
||||
metabasetest.RandEncryptedKeyAndNonce(3),
|
||||
},
|
||||
}
|
||||
// Copy the copy back to the source location
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©BackObjStream,
|
||||
FinishObject: &opts,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
copyBackObj := originalObj
|
||||
copyBackObj.StreamID = copyBackObjStream.StreamID
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
copySegments[i].Pieces = originalSegments[i].Pieces
|
||||
copySegments[i].InlineData = originalSegments[i].InlineData
|
||||
copySegments[i].EncryptedETag = nil // TODO: ETag seems lost after copy
|
||||
|
||||
originalSegments[i].StreamID = opts.NewStreamID
|
||||
originalSegments[i].Pieces = nil
|
||||
originalSegments[i].InlineData = nil
|
||||
originalSegments[i].EncryptedKey = opts.NewSegmentKeys[i].EncryptedKey
|
||||
originalSegments[i].EncryptedKeyNonce = opts.NewSegmentKeys[i].EncryptedKeyNonce
|
||||
originalSegments[i].EncryptedETag = nil // TODO: ETag seems lost after copy
|
||||
}
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(copyObj),
|
||||
metabase.RawObject(copyBackObj),
|
||||
},
|
||||
Segments: append(originalSegments, copySegments...),
|
||||
Copies: []metabase.RawCopy{{
|
||||
StreamID: copyBackObjStream.StreamID,
|
||||
AncestorStreamID: copyObjStream.StreamID,
|
||||
}},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("copied segments has same expires_at as original", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
|
@ -265,7 +265,22 @@ var updateSegmentsWithAncestor = `
|
||||
// Result will contain only those segments which needs to be deleted
|
||||
// from storage nodes. If object is an ancestor for copied object its
|
||||
// segments pieces cannot be deleted because copy still needs it.
|
||||
func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
|
||||
func (db *DB) DeleteObjectExactVersion(
|
||||
ctx context.Context, opts DeleteObjectExactVersion,
|
||||
) (result DeleteObjectResult, err error) {
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
result, err = db.deleteObjectExactVersion(ctx, opts, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package.
|
||||
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
@ -273,10 +288,30 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
}
|
||||
|
||||
if db.config.ServerSideCopy {
|
||||
result, err = db.deleteObjectExactVersionServerSideCopy(ctx, opts)
|
||||
objects, err := db.deleteObjectExactVersionServerSideCopy(ctx, opts, tx)
|
||||
if err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
result.Objects = append(result.Objects, object.Object)
|
||||
|
||||
// if object is ancestor for copied object we cannot delete its
|
||||
// segments pieces from storage nodes so we are not returning it
|
||||
// as an object deletion result
|
||||
if object.PromotedAncestor != nil {
|
||||
continue
|
||||
}
|
||||
for _, segment := range object.Segments {
|
||||
result.Segments = append(result.Segments, DeletedSegmentInfo{
|
||||
RootPieceID: segment.RootPieceID,
|
||||
Pieces: segment.Pieces,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err = withRows(
|
||||
db.db.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL,
|
||||
tx.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
|
||||
@ -293,44 +328,25 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
|
||||
func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (objects []deletedObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
objects := []deletedObjectInfo{}
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
err = withRows(
|
||||
tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.promoteNewAncestors(ctx, tx, objects)
|
||||
err = withRows(
|
||||
tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
result.Objects = append(result.Objects, object.Object)
|
||||
|
||||
// if object is ancestor for copied object we cannot delete its
|
||||
// segments pieces from storage nodes so we are not returning it
|
||||
// as an object deletion result
|
||||
if object.PromotedAncestor != nil {
|
||||
continue
|
||||
}
|
||||
for _, segment := range object.Segments {
|
||||
result.Segments = append(result.Segments, DeletedSegmentInfo{
|
||||
RootPieceID: segment.RootPieceID,
|
||||
Pieces: segment.Pieces,
|
||||
})
|
||||
}
|
||||
err = db.promoteNewAncestors(ctx, tx, objects)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (db *DB) promoteNewAncestors(ctx context.Context, tx tagsql.Tx, objects []deletedObjectInfo) (err error) {
|
||||
|
@ -32,6 +32,15 @@ func RandObjectKey() metabase.ObjectKey {
|
||||
return metabase.ObjectKey(testrand.Bytes(16))
|
||||
}
|
||||
|
||||
// RandEncryptedKeyAndNonce generates random segment metadata.
|
||||
func RandEncryptedKeyAndNonce(position int) metabase.EncryptedKeyAndNonce {
|
||||
return metabase.EncryptedKeyAndNonce{
|
||||
Position: metabase.SegmentPosition{Index: uint32(position)},
|
||||
EncryptedKeyNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedKey: testrand.Bytes(32),
|
||||
}
|
||||
}
|
||||
|
||||
// CreatePendingObject creates a new pending object with the specified number of segments.
|
||||
func CreatePendingObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) {
|
||||
BeginObjectExactVersion{
|
||||
@ -324,11 +333,7 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
expectedEncryptedSize := 1060
|
||||
|
||||
for i := 0; i < int(cc.OriginalObject.SegmentCount); i++ {
|
||||
newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{
|
||||
Position: metabase.SegmentPosition{Index: uint32(i)},
|
||||
EncryptedKeyNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedKey: testrand.Bytes(32),
|
||||
}
|
||||
newEncryptedKeysNonces[i] = RandEncryptedKeyAndNonce(i)
|
||||
|
||||
expectedOriginalSegments[i] = DefaultRawSegment(cc.OriginalObject.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||
|
||||
@ -358,9 +363,9 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
opts := cc.FinishObject
|
||||
if opts == nil {
|
||||
opts = &metabase.FinishCopyObject{
|
||||
ObjectStream: cc.OriginalObject.ObjectStream,
|
||||
NewStreamID: copyStream.StreamID,
|
||||
NewBucket: copyStream.BucketName,
|
||||
ObjectStream: cc.OriginalObject.ObjectStream,
|
||||
NewSegmentKeys: newEncryptedKeysNonces,
|
||||
NewEncryptedObjectKey: copyStream.ObjectKey,
|
||||
NewEncryptedMetadataKeyNonce: testrand.Nonce(),
|
||||
|
Loading…
Reference in New Issue
Block a user