From 0358a08d6b644d88bdc06c2247b52b74d0b41471 Mon Sep 17 00:00:00 2001 From: Fadila Khadar Date: Thu, 27 Jan 2022 01:01:03 +0100 Subject: [PATCH] satellite/metabase: BeginCopyObject and FinishCopyObject Part of server-side copy implementation. Creates the methods BeginCopyObject and FinishCopyObject in satellite/metabase/copy_object.go. This commit makes it possible to copy objects and their segments in metabase. https://github.com/storj/team-metainfo/issues/79 Change-Id: Icc64afce16e5e0e15b83c43cff36797bb9bd39bc --- satellite/metabase/common.go | 3 + satellite/metabase/copy_object.go | 312 ++++++++++++++++++ satellite/metabase/copy_object_test.go | 384 ++++++++++++++++++++++ satellite/metabase/metabasetest/create.go | 77 +++++ satellite/metabase/metabasetest/test.go | 30 ++ satellite/metabase/raw.go | 3 +- 6 files changed, 808 insertions(+), 1 deletion(-) create mode 100644 satellite/metabase/copy_object.go create mode 100644 satellite/metabase/copy_object_test.go diff --git a/satellite/metabase/common.go b/satellite/metabase/common.go index 63222cf9f..f5d86e7e5 100644 --- a/satellite/metabase/common.go +++ b/satellite/metabase/common.go @@ -35,6 +35,9 @@ const ListLimit = intLimitRange(1000) // MoveLimit is the maximum number of segments that can be moved. const MoveLimit = int64(10000) +// CopySegmentLimit is the maximum number of segments that can be copied. +const CopySegmentLimit = int64(10000) + // batchsizeLimit specifies up to how many items fetch from the storage layer at // a time. // diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go new file mode 100644 index 000000000..f1b25b800 --- /dev/null +++ b/satellite/metabase/copy_object.go @@ -0,0 +1,312 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "database/sql" + "errors" + + pgxerrcode "github.com/jackc/pgerrcode" + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/common/uuid" + "storj.io/private/dbutil/pgutil/pgerrcode" + "storj.io/private/dbutil/txutil" + "storj.io/private/tagsql" +) + +// BeginCopyObjectResult holds data needed to finish copy object. +type BeginCopyObjectResult struct { + StreamID uuid.UUID + EncryptedMetadata []byte + EncryptedMetadataKeyNonce []byte + EncryptedMetadataKey []byte + EncryptedKeysNonces []EncryptedKeyAndNonce + EncryptionParameters storj.EncryptionParameters +} + +// BeginCopyObject holds all data needed begin copy object method. +type BeginCopyObject struct { + Version Version + ObjectLocation +} + +// BeginCopyObject collects all data needed to begin object copy procedure. +func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (result BeginCopyObjectResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectLocation.Verify(); err != nil { + return BeginCopyObjectResult{}, err + } + + if opts.Version <= 0 { + return BeginCopyObjectResult{}, ErrInvalidRequest.New("Version invalid: %v", opts.Version) + } + + var segmentCount int64 + + err = db.db.QueryRowContext(ctx, ` + SELECT + stream_id, encryption, segment_count, + encrypted_metadata_encrypted_key, encrypted_metadata_nonce, encrypted_metadata + FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + status = `+committedStatus, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version). + Scan( + &result.StreamID, + encryptionParameters{&result.EncryptionParameters}, + &segmentCount, + &result.EncryptedMetadataKey, &result.EncryptedMetadataKeyNonce, &result.EncryptedMetadata, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return BeginCopyObjectResult{}, storj.ErrObjectNotFound.Wrap(err) + } + return BeginCopyObjectResult{}, Error.New("unable to query object status: %w", err) + } + + if segmentCount > CopySegmentLimit { + return BeginCopyObjectResult{}, Error.New("object to copy has too many segments (%d). Limit is %d.", segmentCount, CopySegmentLimit) + } + + err = withRows(db.db.QueryContext(ctx, ` + SELECT + position, encrypted_key_nonce, encrypted_key + FROM segments + WHERE stream_id = $1 + ORDER BY stream_id, position ASC + `, result.StreamID))(func(rows tagsql.Rows) error { + for rows.Next() { + var keys EncryptedKeyAndNonce + + err = rows.Scan(&keys.Position, &keys.EncryptedKeyNonce, &keys.EncryptedKey) + if err != nil { + return Error.New("failed to scan segments: %w", err) + } + + result.EncryptedKeysNonces = append(result.EncryptedKeysNonces, keys) + } + + return nil + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return BeginCopyObjectResult{}, Error.New("unable to fetch object segments: %w", err) + } + + return result, nil +} + +// FinishCopyObject holds all data needed to finish object copy. +type FinishCopyObject struct { + ObjectStream + NewBucket string + NewStreamID uuid.UUID + NewSegmentKeys []EncryptedKeyAndNonce + // TODO: add NewEncryptedMetadata []byte for being able to change object's metadata + NewEncryptedObjectKey []byte + NewEncryptedMetadataKeyNonce []byte + NewEncryptedMetadataKey []byte +} + +// Verify verifies metabase.FinishCopyObject data. +func (finishCopy FinishCopyObject) Verify() error { + if err := finishCopy.ObjectStream.Verify(); err != nil { + return err + } + + switch { + case len(finishCopy.NewBucket) == 0: + return ErrInvalidRequest.New("NewBucket is missing") + case finishCopy.ObjectStream.StreamID == finishCopy.NewStreamID: + return ErrInvalidRequest.New("StreamIDs are identical") + case finishCopy.ObjectKey == ObjectKey(finishCopy.NewEncryptedObjectKey): + return ErrInvalidRequest.New("source and destination encrypted object key are identical") + case len(finishCopy.NewEncryptedObjectKey) == 0: + return ErrInvalidRequest.New("NewEncryptedObjectKey is missing") + case len(finishCopy.NewEncryptedMetadataKeyNonce) == 0: + return ErrInvalidRequest.New("EncryptedMetadataKeyNonce is missing") + case len(finishCopy.NewEncryptedMetadataKey) == 0: + return ErrInvalidRequest.New("EncryptedMetadataKey is missing") + } + + return nil +} + +// FinishCopyObject accepts new encryption keys for copied object and insert the corresponding new object ObjectKey and segments EncryptedKey. +// TODO should be in one transaction. +// TODO handle the case when the source and destination encrypted object keys are the same. +func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return err + } + + originalObject, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{ + opts.Version, + opts.Location(), + }) + if err != nil { + return errs.Wrap(err) + } + + if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) { + return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys)) + } + + var newSegmentKeys struct { + Positions []int64 + EncryptedKeys [][]byte + EncryptedKeyNonces [][]byte + } + + for _, u := range opts.NewSegmentKeys { + newSegmentKeys.EncryptedKeys = append(newSegmentKeys.EncryptedKeys, u.EncryptedKey) + newSegmentKeys.EncryptedKeyNonces = append(newSegmentKeys.EncryptedKeyNonces, u.EncryptedKeyNonce) + newSegmentKeys.Positions = append(newSegmentKeys.Positions, int64(u.Position.Encode())) + } + + segments := make([]Segment, originalObject.SegmentCount) + positions := make([]int64, originalObject.SegmentCount) + + // TODO: there are probably columns that we can skip + // maybe it's possible to have the select and the insert in one query + err = withRows(db.db.QueryContext(ctx, ` + SELECT + position, + expires_at, repaired_at, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, + encrypted_etag, + redundancy, + inline_data, + placement + FROM segments + WHERE stream_id = $1 + ORDER BY position ASC + LIMIT $2 + `, originalObject.StreamID, originalObject.SegmentCount))(func(rows tagsql.Rows) error { + index := 0 + for rows.Next() { + err = rows.Scan( + &segments[index].Position, + &segments[index].ExpiresAt, &segments[index].RepairedAt, + &segments[index].RootPieceID, &segments[index].EncryptedKeyNonce, &segments[index].EncryptedKey, + &segments[index].EncryptedSize, &segments[index].PlainOffset, &segments[index].PlainSize, + &segments[index].EncryptedETag, + redundancyScheme{&segments[index].Redundancy}, + &segments[index].InlineData, + &segments[index].Placement, + ) + if err != nil { + return err + } + positions[index] = int64(segments[index].Position.Encode()) + index++ + } + + if err = rows.Err(); err != nil { + return err + } + return nil + }) + if err != nil { + return Error.New("unable to copy object: %w", err) + } + + for index := range segments { + if newSegmentKeys.Positions[index] != int64(segments[index].Position.Encode()) { + return Error.New("missing new segment keys for segment %d", int64(segments[index].Position.Encode())) + } + } + + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { + // TODO we need to handle metadata correctly (copy from original object or replace) + _, err = db.db.ExecContext(ctx, ` + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + expires_at, status, segment_count, + encryption, + encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, + zombie_deletion_deadline + ) VALUES ( + $1, $2, $3, $4, $5, + $6,`+committedStatus+`, $7, + $8, + $9, $10, $11, + $12, $13, $14, null + ) + `, opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID, + originalObject.ExpiresAt, originalObject.SegmentCount, + encryptionParameters{&originalObject.Encryption}, + originalObject.EncryptedMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey, + originalObject.TotalPlainSize, originalObject.TotalEncryptedSize, originalObject.FixedSegmentSize, + ) + if err != nil { + if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { + return ErrObjectAlreadyExists.New("") + } + return Error.New("unable to copy object: %w", err) + } + + // TODO: optimize - we should do a bulk insert + for index, originalSegment := range segments { + + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, + encrypted_key_nonce, encrypted_key, + root_piece_id, -- non-null constraint + redundancy, + encrypted_size, plain_offset, plain_size, + inline_data + ) VALUES ( + $1, $2, + $3, $4, + $5, + $6, + $7, $8, $9, + $10 + ) + `, opts.NewStreamID, originalSegment.Position.Encode(), + newSegmentKeys.EncryptedKeyNonces[index], newSegmentKeys.EncryptedKeys[index], + originalSegment.RootPieceID, + redundancyScheme{&originalSegment.Redundancy}, + originalSegment.EncryptedSize, originalSegment.PlainOffset, originalSegment.PlainSize, + originalSegment.InlineData, + ) + if err != nil { + return Error.New("unable to copy segment: %w", err) + } + + } + + // TODO : we need flatten references + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segment_copies ( + stream_id, ancestor_stream_id + ) VALUES ( + $1, $2 + ) + `, opts.NewStreamID, originalObject.StreamID) + if err != nil { + return Error.New("unable to copy object: %w", err) + } + return nil + }) + if err != nil { + return err + } + mon.Meter("finish_copy_object").Mark(1) + + return nil +} diff --git a/satellite/metabase/copy_object_test.go b/satellite/metabase/copy_object_test.go new file mode 100644 index 000000000..40ac77661 --- /dev/null +++ b/satellite/metabase/copy_object_test.go @@ -0,0 +1,384 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "testing" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" +) + +func TestBeginCopyObject(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + for _, test := range metabasetest.InvalidObjectLocations(obj.Location()) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.BeginCopyObject{ + Opts: metabase.BeginCopyObject{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + } + + t.Run("invalid version", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginCopyObject{ + Opts: metabase.BeginCopyObject{ + ObjectLocation: obj.Location(), + Version: 0, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version invalid: 0", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("begin copy object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + expectedMetadataNonce := testrand.Nonce() + expectedMetadataKey := testrand.Bytes(265) + expectedObject := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: expectedMetadataNonce[:], + EncryptedMetadataEncryptedKey: expectedMetadataKey, + }, + }.Run(ctx, t, db, obj, 10) + + var encKeyAndNonces []metabase.EncryptedKeyAndNonce + expectedRawSegments := make([]metabase.RawSegment, 10) + for i := range expectedRawSegments { + expectedRawSegments[i] = metabasetest.DefaultRawSegment(expectedObject.ObjectStream, metabase.SegmentPosition{ + Index: uint32(i), + }) + expectedRawSegments[i].PlainOffset = int64(i) * int64(expectedRawSegments[i].PlainSize) + expectedRawSegments[i].EncryptedSize = 1060 + + encKeyAndNonces = append(encKeyAndNonces, metabase.EncryptedKeyAndNonce{ + EncryptedKeyNonce: expectedRawSegments[i].EncryptedKeyNonce, + EncryptedKey: expectedRawSegments[i].EncryptedKey, + Position: expectedRawSegments[i].Position, + }) + } + + metabasetest.BeginCopyObject{ + Opts: metabase.BeginCopyObject{ + Version: expectedObject.Version, + ObjectLocation: obj.Location(), + }, + Result: metabase.BeginCopyObjectResult{ + StreamID: expectedObject.StreamID, + EncryptedMetadata: expectedObject.EncryptedMetadata, + EncryptedMetadataKey: expectedMetadataKey, + EncryptedMetadataKeyNonce: expectedMetadataNonce[:], + EncryptedKeysNonces: encKeyAndNonces, + EncryptionParameters: expectedObject.Encryption, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(expectedObject), + }, + Segments: expectedRawSegments, + }.Check(ctx, t, db) + }) + }) +} + +func TestFinishCopyObject(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + newBucketName := "New bucket name" + + for _, test := range metabasetest.InvalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: test.ObjectStream, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + } + + t.Run("invalid NewBucket", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + ObjectStream: obj, + NewEncryptedObjectKey: []byte{1, 2, 3}, + NewEncryptedMetadataKey: []byte{1, 2, 3}, + NewEncryptedMetadataKeyNonce: []byte{1, 2, 3}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "NewBucket is missing", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("copy to the same StreamID", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + ObjectStream: obj, + NewBucket: newBucketName, + NewStreamID: obj.StreamID, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "StreamIDs are identical", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("invalid NewEncryptedObjectKey", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: obj, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "NewEncryptedObjectKey is missing", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("copy to the same EncryptedObjectKey", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + NewEncryptedObjectKey: []byte(obj.ObjectKey), + ObjectStream: obj, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "source and destination encrypted object key are identical", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("invalid EncryptedMetadataKeyNonce", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: obj, + NewEncryptedObjectKey: []byte{0}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedMetadataKeyNonce is missing", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("invalid EncryptedMetadataKey", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: obj, + NewEncryptedObjectKey: []byte{0}, + NewEncryptedMetadataKeyNonce: []byte{0}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedMetadataKey is missing", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("object does not exist", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + newObj := metabasetest.RandObjectStream() + + newEncryptedMetadataKeyNonce := testrand.Nonce() + newEncryptedMetadataKey := testrand.Bytes(32) + newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, 10) + newObjectKey := testrand.Bytes(32) + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: newObj, + NewSegmentKeys: newEncryptedKeysNonces, + NewEncryptedObjectKey: newObjectKey, + NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(), + NewEncryptedMetadataKey: newEncryptedMetadataKey, + }, + ErrClass: &storj.ErrObjectNotFound, + ErrText: "metabase: sql: no rows in result set", + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("less amount of segments", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + numberOfSegments := 10 + newObjectKey := testrand.Bytes(32) + + newObj := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, obj, byte(numberOfSegments)) + + newEncryptedMetadataKeyNonce := testrand.Nonce() + newEncryptedMetadataKey := testrand.Bytes(32) + newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, newObj.SegmentCount-1) + expectedSegments := make([]metabase.RawSegment, newObj.SegmentCount) + + for i := 0; i < int(newObj.SegmentCount-1); i++ { + newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{ + Position: metabase.SegmentPosition{Index: uint32(i)}, + EncryptedKeyNonce: testrand.Nonce().Bytes(), + EncryptedKey: testrand.Bytes(32), + } + + expectedSegments[i] = metabasetest.DefaultRawSegment(newObj.ObjectStream, metabase.SegmentPosition{Index: uint32(i)}) + expectedSegments[i].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce + expectedSegments[i].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey + expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize) + expectedSegments[i].EncryptedSize = int32(0) + } + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewBucket: newBucketName, + ObjectStream: obj, + NewSegmentKeys: newEncryptedKeysNonces, + NewEncryptedObjectKey: newObjectKey, + NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(), + NewEncryptedMetadataKey: newEncryptedMetadataKey, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "wrong amount of segments keys received (received 10, need 9)", + }.Check(ctx, t, db) + }) + + t.Run("wrong segment indexes", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + numberOfSegments := 10 + newObjectKey := testrand.Bytes(32) + + newObj := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, obj, byte(numberOfSegments)) + + newEncryptedMetadataKeyNonce := testrand.Nonce() + newEncryptedMetadataKey := testrand.Bytes(32) + newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, newObj.SegmentCount) + expectedEncryptedSize := 1060 + expectedSegments := make([]metabase.RawSegment, newObj.SegmentCount) + + for i := 0; i < int(newObj.SegmentCount); i++ { + newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{ + Position: metabase.SegmentPosition{Index: uint32(i + 5)}, + EncryptedKeyNonce: testrand.Nonce().Bytes(), + EncryptedKey: testrand.Bytes(32), + } + + expectedSegments[i] = metabasetest.DefaultRawSegment(newObj.ObjectStream, metabase.SegmentPosition{Index: uint32(i)}) + expectedSegments[i].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce + expectedSegments[i].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey + expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize) + expectedSegments[i].EncryptedSize = int32(expectedEncryptedSize) + } + + metabasetest.FinishCopyObject{ + Opts: metabase.FinishCopyObject{ + NewStreamID: testrand.UUID(), + NewBucket: newBucketName, + ObjectStream: obj, + NewSegmentKeys: newEncryptedKeysNonces, + NewEncryptedObjectKey: newObjectKey, + NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(), + NewEncryptedMetadataKey: newEncryptedMetadataKey, + }, + ErrClass: &metabase.Error, + ErrText: "missing new segment keys for segment 0", + }.Check(ctx, t, db) + }) + + t.Run("finish copy object with existing metadata", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + numberOfSegments := 10 + copyStream := metabasetest.RandObjectStream() + + // make sure segments are ordered as expected when checking database + if copyStream.StreamID.Less(obj.StreamID) { + obj, copyStream = copyStream, obj + } + + originalObj := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, obj, byte(numberOfSegments)) + + copyObj, expectedSegments := metabasetest.CreateObjectCopy{ + OriginalObject: originalObj, + CopyObjectStream: ©Stream, + }.Run(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(originalObj), + metabase.RawObject(copyObj), + }, + Segments: expectedSegments, + }.Check(ctx, t, db) + }) + }) + // TODO: test with new metadata +} diff --git a/satellite/metabase/metabasetest/create.go b/satellite/metabase/metabasetest/create.go index dbbe12087..1948a557f 100644 --- a/satellite/metabase/metabasetest/create.go +++ b/satellite/metabase/metabasetest/create.go @@ -255,3 +255,80 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab Opts: coOpts, }.Check(ctx, t, db) } + +// CreateObjectCopy is for testing object copy. +type CreateObjectCopy struct { + OriginalObject metabase.Object + FinishObject *metabase.FinishCopyObject + CopyObjectStream *metabase.ObjectStream +} + +// Run creates the copy. +func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB) (metabase.Object, []metabase.RawSegment) { + + var copyStream metabase.ObjectStream + if cc.CopyObjectStream != nil { + copyStream = *cc.CopyObjectStream + } else { + copyStream = RandObjectStream() + } + + newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, cc.OriginalObject.SegmentCount) + expectedSegments := make([]metabase.RawSegment, cc.OriginalObject.SegmentCount*2) + expectedEncryptedSize := 1060 + + for i := 0; i < int(cc.OriginalObject.SegmentCount); i++ { + newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{ + Position: metabase.SegmentPosition{Index: uint32(i)}, + EncryptedKeyNonce: testrand.Nonce().Bytes(), + EncryptedKey: testrand.Bytes(32), + } + + copyIndex := i + int(cc.OriginalObject.SegmentCount) + expectedSegments[i] = DefaultRawSegment(cc.OriginalObject.ObjectStream, metabase.SegmentPosition{Index: uint32(i)}) + + // TODO: place this calculation in metabasetest. + expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize) + // TODO: we should use the same value for encrypted size in both test methods. + expectedSegments[i].EncryptedSize = int32(expectedEncryptedSize) + + expectedSegments[copyIndex] = metabase.RawSegment{} + expectedSegments[copyIndex].StreamID = copyStream.StreamID + expectedSegments[copyIndex].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce + expectedSegments[copyIndex].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey + expectedSegments[copyIndex].EncryptedSize = expectedSegments[i].EncryptedSize + expectedSegments[copyIndex].Position = expectedSegments[i].Position + expectedSegments[copyIndex].RootPieceID = expectedSegments[i].RootPieceID + expectedSegments[copyIndex].Redundancy = expectedSegments[i].Redundancy + expectedSegments[copyIndex].PlainSize = expectedSegments[i].PlainSize + expectedSegments[copyIndex].PlainOffset = expectedSegments[i].PlainOffset + expectedSegments[copyIndex].CreatedAt = time.Now().UTC() + } + + opts := cc.FinishObject + if opts == nil { + opts = &metabase.FinishCopyObject{ + NewStreamID: copyStream.StreamID, + NewBucket: copyStream.BucketName, + ObjectStream: cc.OriginalObject.ObjectStream, + NewSegmentKeys: newEncryptedKeysNonces, + NewEncryptedObjectKey: []byte(copyStream.ObjectKey), + NewEncryptedMetadataKeyNonce: testrand.Nonce().Bytes(), + NewEncryptedMetadataKey: testrand.Bytes(32), + } + } + FinishCopyObject{ + Opts: *opts, + ErrText: "", + }.Check(ctx, t, db) + + copyObj := cc.OriginalObject + copyObj.StreamID = copyStream.StreamID + copyObj.ObjectKey = copyStream.ObjectKey + copyObj.EncryptedMetadataEncryptedKey = opts.NewEncryptedMetadataKey + copyObj.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce + copyObj.BucketName = copyStream.BucketName + copyObj.ZombieDeletionDeadline = nil + + return copyObj, expectedSegments +} diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index c0b6bf904..ddd3ecf54 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -655,6 +655,36 @@ func (step FinishMoveObject) Check(ctx *testcontext.Context, t testing.TB, db *m checkError(t, err, step.ErrClass, step.ErrText) } +// BeginCopyObject is for testing metabase.BeginCopyObject. +type BeginCopyObject struct { + Opts metabase.BeginCopyObject + Result metabase.BeginCopyObjectResult + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step BeginCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + result, err := db.BeginCopyObject(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result) + require.Zero(t, diff) +} + +// FinishCopyObject is for testing metabase.FinishCopyObject. +type FinishCopyObject struct { + Opts metabase.FinishCopyObject + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step FinishCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + err := db.FinishCopyObject(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + // GetProjectSegmentCount is for testing metabase.GetProjectSegmentCount. type GetProjectSegmentCount struct { Opts metabase.GetProjectSegmentCount diff --git a/satellite/metabase/raw.go b/satellite/metabase/raw.go index 9e7bca0ca..d4c9e45b3 100644 --- a/satellite/metabase/raw.go +++ b/satellite/metabase/raw.go @@ -97,6 +97,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { _, err = db.db.ExecContext(ctx, ` DELETE FROM objects; DELETE FROM segments; + DELETE FROM segment_copies; DELETE FROM node_aliases; SELECT setval('node_alias_seq', 1, false); `) @@ -178,7 +179,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er plain_offset, plain_size, encrypted_etag, redundancy, - inline_data, remote_alias_pieces, + inline_data, remote_alias_pieces, placement FROM segments ORDER BY stream_id ASC, position ASC