satellite/metabase: overwrite existing destination on copy

s3 allows for overwriting an object when using server-side copy.
This change makes overwriting the destination part of the atomic server-side copy operation so that
if copy fails, the old object is still available.

All the segments of the existing destination are deleted. If this destination object is an ancestor of another object, a new ancestor is promoted.

Fixes https://github.com/storj/storj/issues/4607

Change-Id: I85d1250850bb71867586ac230c8275a0cc1b63c3
This commit is contained in:
Fadila Khadar 2022-03-23 23:00:40 +01:00 committed by Michal Niewrzal
parent 3397886b14
commit 3d9329a5b1
2 changed files with 230 additions and 41 deletions

View File

@ -9,12 +9,9 @@ import (
"errors"
"time"
pgxerrcode "github.com/jackc/pgerrcode"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/pgutil/pgerrcode"
"storj.io/private/dbutil/txutil"
"storj.io/private/tagsql"
)
@ -133,8 +130,6 @@ func (finishCopy FinishCopyObject) Verify() error {
return ErrInvalidRequest.New("NewStreamID is missing")
case finishCopy.ObjectStream.StreamID == finishCopy.NewStreamID:
return ErrInvalidRequest.New("StreamIDs are identical")
case finishCopy.ObjectKey == finishCopy.NewEncryptedObjectKey:
return ErrInvalidRequest.New("source and destination encrypted object key are identical")
case len(finishCopy.NewEncryptedObjectKey) == 0:
return ErrInvalidRequest.New("NewEncryptedObjectKey is missing")
}
@ -293,6 +288,19 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// TODO we need to handle metadata correctly (copy from original object or replace)
row := db.db.QueryRowContext(ctx, `
WITH existing_object AS (
SELECT
objects.stream_id,
copies.stream_id AS new_ancestor,
objects.segment_count
FROM objects
LEFT OUTER JOIN segment_copies copies ON objects.stream_id = copies.ancestor_stream_id
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4
)
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, status, segment_count,
@ -306,36 +314,61 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
$8,
$9, $10, $11,
$12, $13, $14, null
) RETURNING created_at`,
) ON CONFLICT (project_id, bucket_name, object_key, version)
DO UPDATE SET
stream_id = $5,
created_at = now(),
expires_at = $6,
status = `+committedStatus+`,
segment_count = $7,
encryption = $8,
encrypted_metadata = $9,
encrypted_metadata_nonce = $10,
encrypted_metadata_encrypted_key = $11,
total_plain_size = $12,
total_encrypted_size = $13,
fixed_segment_size = $14,
zombie_deletion_deadline = NULL
RETURNING
created_at,
(SELECT stream_id FROM existing_object LIMIT 1),
(SELECT new_ancestor FROM existing_object LIMIT 1),
(SELECT segment_count FROM existing_object LIMIT 1)`,
opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
originalObject.ExpiresAt, originalObject.SegmentCount,
encryptionParameters{&originalObject.Encryption},
copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
originalObject.TotalPlainSize, originalObject.TotalEncryptedSize, originalObject.FixedSegmentSize,
)
err = row.Scan(&copyObject.CreatedAt)
var existingObjStreamID *uuid.UUID
var newAncestorStreamID *uuid.UUID
var oldSegmentCount *int
err = row.Scan(&copyObject.CreatedAt, &existingObjStreamID, &newAncestorStreamID, &oldSegmentCount)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return ErrObjectAlreadyExists.New("")
}
return Error.New("unable to copy object: %w", err)
}
err = db.deleteExistingObjectSegments(ctx, tx, existingObjStreamID, newAncestorStreamID, oldSegmentCount)
if err != nil {
return Error.New("unable to copy object: %w", err)
}
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position,
encrypted_key_nonce, encrypted_key,
root_piece_id,
redundancy,
encrypted_size, plain_offset, plain_size,
inline_data
) SELECT
$1, UNNEST($2::INT8[]),
UNNEST($3::BYTEA[]), UNNEST($4::BYTEA[]),
UNNEST($5::BYTEA[]),
UNNEST($6::INT8[]),
UNNEST($7::INT4[]), UNNEST($8::INT8[]), UNNEST($9::INT4[]),
UNNEST($10::BYTEA[])
INSERT INTO segments (
stream_id, position,
encrypted_key_nonce, encrypted_key,
root_piece_id,
redundancy,
encrypted_size, plain_offset, plain_size,
inline_data
) SELECT
$1, UNNEST($2::INT8[]),
UNNEST($3::BYTEA[]), UNNEST($4::BYTEA[]),
UNNEST($5::BYTEA[]),
UNNEST($6::INT8[]),
UNNEST($7::INT4[]), UNNEST($8::INT8[]), UNNEST($9::INT4[]),
UNNEST($10::BYTEA[])
`, opts.NewStreamID, pgutil.Int8Array(newSegments.Positions),
pgutil.ByteaArray(newSegments.EncryptedKeyNonces), pgutil.ByteaArray(newSegments.EncryptedKeys),
pgutil.ByteaArray(rootPieceIDs),
@ -389,3 +422,56 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
return copyObject, nil
}
func (db *DB) deleteExistingObjectSegments(ctx context.Context, tx tagsql.Tx, existingObjStreamID *uuid.UUID, newAncestorStreamID *uuid.UUID, segmentCount *int) (err error) {
if existingObjStreamID != nil && *segmentCount > 0 {
if newAncestorStreamID == nil {
_, err = db.db.ExecContext(ctx, `
DELETE FROM segments WHERE stream_id = $1
`, existingObjStreamID,
)
if err != nil {
return Error.New("unable to copy segments: %w", err)
}
return nil
}
var infos deletedObjectInfo
infos.SegmentCount = int32(*segmentCount)
infos.PromotedAncestor = newAncestorStreamID
infos.Segments = make([]deletedRemoteSegmentInfo, *segmentCount)
var aliasPieces AliasPieces
err = withRows(db.db.QueryContext(ctx, `
DELETE FROM segments WHERE stream_id = $1
RETURNING position, remote_alias_pieces, repaired_at
`, existingObjStreamID))(func(rows tagsql.Rows) error {
index := 0
for rows.Next() {
err = rows.Scan(
&infos.Segments[index].Position,
&aliasPieces,
&infos.Segments[index].RepairedAt,
)
if err != nil {
return err
}
infos.Segments[index].Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("unable to copy object: %w", err)
}
index++
}
return rows.Err()
})
if err != nil {
return Error.New("unable to copy segments: %w", err)
}
err = db.promoteNewAncestors(ctx, tx, []deletedObjectInfo{infos})
if err != nil {
return Error.New("unable to copy segments: %w", err)
}
}
return nil
}

View File

@ -196,23 +196,6 @@ func TestFinishCopyObject(t *testing.T) {
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("copy to the same EncryptedObjectKey", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
NewBucket: newBucketName,
NewEncryptedObjectKey: obj.ObjectKey,
ObjectStream: obj,
NewStreamID: newStreamID,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "source and destination encrypted object key are identical",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("invalid EncryptedMetadataKeyNonce", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
@ -619,5 +602,125 @@ func TestFinishCopyObject(t *testing.T) {
Copies: nil,
}.Check(ctx, t, db)
})
t.Run("finish copy object to already existing destination", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
// Test:
// - 3 objects: objA, objB, objC
// - copy objB to objA - creating objBprime
// - check that segments of original objA have been deleted
// - check that we now have three objects: objBprime, objB, objC
// - copy objC to objB creating objCprime
// - check that we now have three objects: objBprime, objCprime, objC
// - check that objBprime has become an original object, now that its ancestor
// objB has been overwritten
// object that already exists
objStreamA := metabasetest.RandObjectStream()
objStreamB := metabasetest.RandObjectStream()
objStreamC := metabasetest.RandObjectStream()
// set same projectID for all
objStreamB.ProjectID = objStreamA.ProjectID
objStreamC.ProjectID = objStreamA.ProjectID
objA, _ := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: objStreamA,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, objStreamA, 4)
objB, _ := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: objStreamB,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, objStreamB, 3)
objC, segmentsOfC := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: objStreamC,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, objStreamC, 1)
// B' is a copy of B to A
objStreamBprime := objStreamA
objStreamBprime.StreamID = testrand.UUID()
objBprime, expectedSegmentsOfB, expectedSegmentsOfBprime := metabasetest.CreateObjectCopy{
OriginalObject: objB,
CopyObjectStream: &objStreamBprime,
}.Run(ctx, t, db)
// check that we indeed overwrote object A
require.Equal(t, objA.BucketName, objBprime.BucketName)
require.Equal(t, objA.ProjectID, objBprime.ProjectID)
require.Equal(t, objA.ObjectKey, objBprime.ObjectKey)
require.NotEqual(t, objA.StreamID, objBprime.StreamID)
var expectedRawSegments []metabase.RawSegment
expectedRawSegments = append(expectedRawSegments, expectedSegmentsOfBprime...)
expectedRawSegments = append(expectedRawSegments, expectedSegmentsOfB...)
expectedRawSegments = append(expectedRawSegments, metabasetest.SegmentsToRaw(segmentsOfC)...)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(objBprime),
metabase.RawObject(objB),
metabase.RawObject(objC),
},
Segments: expectedRawSegments,
Copies: []metabase.RawCopy{{
StreamID: objBprime.StreamID,
AncestorStreamID: objB.StreamID,
}},
}.Check(ctx, t, db)
// C' is a copy of C to B
objStreamCprime := objStreamB
objStreamCprime.StreamID = testrand.UUID()
objCprime, _, expectedSegmentsOfCprime := metabasetest.CreateObjectCopy{
OriginalObject: objC,
CopyObjectStream: &objStreamCprime,
}.Run(ctx, t, db)
require.Equal(t, objStreamB.BucketName, objCprime.BucketName)
require.Equal(t, objStreamB.ProjectID, objCprime.ProjectID)
require.Equal(t, objStreamB.ObjectKey, objCprime.ObjectKey)
require.NotEqual(t, objB.StreamID, objCprime)
// B' should become the original of B and now hold pieces.
for i := range expectedSegmentsOfBprime {
expectedSegmentsOfBprime[i].EncryptedETag = nil
expectedSegmentsOfBprime[i].Pieces = expectedSegmentsOfB[i].Pieces
}
var expectedSegments []metabase.RawSegment
expectedSegments = append(expectedSegments, expectedSegmentsOfBprime...)
expectedSegments = append(expectedSegments, expectedSegmentsOfCprime...)
expectedSegments = append(expectedSegments, metabasetest.SegmentsToRaw(segmentsOfC)...)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(objBprime),
metabase.RawObject(objCprime),
metabase.RawObject(objC),
},
Segments: expectedSegments,
Copies: []metabase.RawCopy{{
StreamID: objCprime.StreamID,
AncestorStreamID: objC.StreamID,
}},
}.Check(ctx, t, db)
})
})
}