satellite/metabase: BeginCopyObject and FinishCopyObject
Part of server-side copy implementation. Creates the methods BeginCopyObject and FinishCopyObject in satellite/metabase/copy_object.go. This commit makes it possible to copy objects and their segments in metabase. https://github.com/storj/team-metainfo/issues/79 Change-Id: Icc64afce16e5e0e15b83c43cff36797bb9bd39bc
This commit is contained in:
parent
294d253923
commit
0358a08d6b
@ -35,6 +35,9 @@ const ListLimit = intLimitRange(1000)
|
|||||||
// MoveLimit is the maximum number of segments that can be moved.
|
// MoveLimit is the maximum number of segments that can be moved.
|
||||||
const MoveLimit = int64(10000)
|
const MoveLimit = int64(10000)
|
||||||
|
|
||||||
|
// CopySegmentLimit is the maximum number of segments that can be copied.
|
||||||
|
const CopySegmentLimit = int64(10000)
|
||||||
|
|
||||||
// batchsizeLimit specifies up to how many items fetch from the storage layer at
|
// batchsizeLimit specifies up to how many items fetch from the storage layer at
|
||||||
// a time.
|
// a time.
|
||||||
//
|
//
|
||||||
|
312
satellite/metabase/copy_object.go
Normal file
312
satellite/metabase/copy_object.go
Normal file
@ -0,0 +1,312 @@
|
|||||||
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package metabase
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
pgxerrcode "github.com/jackc/pgerrcode"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/common/uuid"
|
||||||
|
"storj.io/private/dbutil/pgutil/pgerrcode"
|
||||||
|
"storj.io/private/dbutil/txutil"
|
||||||
|
"storj.io/private/tagsql"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BeginCopyObjectResult holds data needed to finish copy object.
|
||||||
|
type BeginCopyObjectResult struct {
|
||||||
|
StreamID uuid.UUID
|
||||||
|
EncryptedMetadata []byte
|
||||||
|
EncryptedMetadataKeyNonce []byte
|
||||||
|
EncryptedMetadataKey []byte
|
||||||
|
EncryptedKeysNonces []EncryptedKeyAndNonce
|
||||||
|
EncryptionParameters storj.EncryptionParameters
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginCopyObject holds all data needed begin copy object method.
|
||||||
|
type BeginCopyObject struct {
|
||||||
|
Version Version
|
||||||
|
ObjectLocation
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginCopyObject collects all data needed to begin object copy procedure.
|
||||||
|
func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (result BeginCopyObjectResult, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if err := opts.ObjectLocation.Verify(); err != nil {
|
||||||
|
return BeginCopyObjectResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.Version <= 0 {
|
||||||
|
return BeginCopyObjectResult{}, ErrInvalidRequest.New("Version invalid: %v", opts.Version)
|
||||||
|
}
|
||||||
|
|
||||||
|
var segmentCount int64
|
||||||
|
|
||||||
|
err = db.db.QueryRowContext(ctx, `
|
||||||
|
SELECT
|
||||||
|
stream_id, encryption, segment_count,
|
||||||
|
encrypted_metadata_encrypted_key, encrypted_metadata_nonce, encrypted_metadata
|
||||||
|
FROM objects
|
||||||
|
WHERE
|
||||||
|
project_id = $1 AND
|
||||||
|
bucket_name = $2 AND
|
||||||
|
object_key = $3 AND
|
||||||
|
version = $4 AND
|
||||||
|
status = `+committedStatus,
|
||||||
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
|
||||||
|
Scan(
|
||||||
|
&result.StreamID,
|
||||||
|
encryptionParameters{&result.EncryptionParameters},
|
||||||
|
&segmentCount,
|
||||||
|
&result.EncryptedMetadataKey, &result.EncryptedMetadataKeyNonce, &result.EncryptedMetadata,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return BeginCopyObjectResult{}, storj.ErrObjectNotFound.Wrap(err)
|
||||||
|
}
|
||||||
|
return BeginCopyObjectResult{}, Error.New("unable to query object status: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if segmentCount > CopySegmentLimit {
|
||||||
|
return BeginCopyObjectResult{}, Error.New("object to copy has too many segments (%d). Limit is %d.", segmentCount, CopySegmentLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = withRows(db.db.QueryContext(ctx, `
|
||||||
|
SELECT
|
||||||
|
position, encrypted_key_nonce, encrypted_key
|
||||||
|
FROM segments
|
||||||
|
WHERE stream_id = $1
|
||||||
|
ORDER BY stream_id, position ASC
|
||||||
|
`, result.StreamID))(func(rows tagsql.Rows) error {
|
||||||
|
for rows.Next() {
|
||||||
|
var keys EncryptedKeyAndNonce
|
||||||
|
|
||||||
|
err = rows.Scan(&keys.Position, &keys.EncryptedKeyNonce, &keys.EncryptedKey)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("failed to scan segments: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result.EncryptedKeysNonces = append(result.EncryptedKeysNonces, keys)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return BeginCopyObjectResult{}, Error.New("unable to fetch object segments: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FinishCopyObject holds all data needed to finish object copy.
|
||||||
|
type FinishCopyObject struct {
|
||||||
|
ObjectStream
|
||||||
|
NewBucket string
|
||||||
|
NewStreamID uuid.UUID
|
||||||
|
NewSegmentKeys []EncryptedKeyAndNonce
|
||||||
|
// TODO: add NewEncryptedMetadata []byte for being able to change object's metadata
|
||||||
|
NewEncryptedObjectKey []byte
|
||||||
|
NewEncryptedMetadataKeyNonce []byte
|
||||||
|
NewEncryptedMetadataKey []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify verifies metabase.FinishCopyObject data.
|
||||||
|
func (finishCopy FinishCopyObject) Verify() error {
|
||||||
|
if err := finishCopy.ObjectStream.Verify(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(finishCopy.NewBucket) == 0:
|
||||||
|
return ErrInvalidRequest.New("NewBucket is missing")
|
||||||
|
case finishCopy.ObjectStream.StreamID == finishCopy.NewStreamID:
|
||||||
|
return ErrInvalidRequest.New("StreamIDs are identical")
|
||||||
|
case finishCopy.ObjectKey == ObjectKey(finishCopy.NewEncryptedObjectKey):
|
||||||
|
return ErrInvalidRequest.New("source and destination encrypted object key are identical")
|
||||||
|
case len(finishCopy.NewEncryptedObjectKey) == 0:
|
||||||
|
return ErrInvalidRequest.New("NewEncryptedObjectKey is missing")
|
||||||
|
case len(finishCopy.NewEncryptedMetadataKeyNonce) == 0:
|
||||||
|
return ErrInvalidRequest.New("EncryptedMetadataKeyNonce is missing")
|
||||||
|
case len(finishCopy.NewEncryptedMetadataKey) == 0:
|
||||||
|
return ErrInvalidRequest.New("EncryptedMetadataKey is missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FinishCopyObject accepts new encryption keys for copied object and insert the corresponding new object ObjectKey and segments EncryptedKey.
|
||||||
|
// TODO should be in one transaction.
|
||||||
|
// TODO handle the case when the source and destination encrypted object keys are the same.
|
||||||
|
func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if err := opts.Verify(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
originalObject, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
|
||||||
|
opts.Version,
|
||||||
|
opts.Location(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
|
||||||
|
return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
|
||||||
|
}
|
||||||
|
|
||||||
|
var newSegmentKeys struct {
|
||||||
|
Positions []int64
|
||||||
|
EncryptedKeys [][]byte
|
||||||
|
EncryptedKeyNonces [][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, u := range opts.NewSegmentKeys {
|
||||||
|
newSegmentKeys.EncryptedKeys = append(newSegmentKeys.EncryptedKeys, u.EncryptedKey)
|
||||||
|
newSegmentKeys.EncryptedKeyNonces = append(newSegmentKeys.EncryptedKeyNonces, u.EncryptedKeyNonce)
|
||||||
|
newSegmentKeys.Positions = append(newSegmentKeys.Positions, int64(u.Position.Encode()))
|
||||||
|
}
|
||||||
|
|
||||||
|
segments := make([]Segment, originalObject.SegmentCount)
|
||||||
|
positions := make([]int64, originalObject.SegmentCount)
|
||||||
|
|
||||||
|
// TODO: there are probably columns that we can skip
|
||||||
|
// maybe it's possible to have the select and the insert in one query
|
||||||
|
err = withRows(db.db.QueryContext(ctx, `
|
||||||
|
SELECT
|
||||||
|
position,
|
||||||
|
expires_at, repaired_at,
|
||||||
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
||||||
|
encrypted_size, plain_offset, plain_size,
|
||||||
|
encrypted_etag,
|
||||||
|
redundancy,
|
||||||
|
inline_data,
|
||||||
|
placement
|
||||||
|
FROM segments
|
||||||
|
WHERE stream_id = $1
|
||||||
|
ORDER BY position ASC
|
||||||
|
LIMIT $2
|
||||||
|
`, originalObject.StreamID, originalObject.SegmentCount))(func(rows tagsql.Rows) error {
|
||||||
|
index := 0
|
||||||
|
for rows.Next() {
|
||||||
|
err = rows.Scan(
|
||||||
|
&segments[index].Position,
|
||||||
|
&segments[index].ExpiresAt, &segments[index].RepairedAt,
|
||||||
|
&segments[index].RootPieceID, &segments[index].EncryptedKeyNonce, &segments[index].EncryptedKey,
|
||||||
|
&segments[index].EncryptedSize, &segments[index].PlainOffset, &segments[index].PlainSize,
|
||||||
|
&segments[index].EncryptedETag,
|
||||||
|
redundancyScheme{&segments[index].Redundancy},
|
||||||
|
&segments[index].InlineData,
|
||||||
|
&segments[index].Placement,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
positions[index] = int64(segments[index].Position.Encode())
|
||||||
|
index++
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = rows.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("unable to copy object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for index := range segments {
|
||||||
|
if newSegmentKeys.Positions[index] != int64(segments[index].Position.Encode()) {
|
||||||
|
return Error.New("missing new segment keys for segment %d", int64(segments[index].Position.Encode()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||||
|
// TODO we need to handle metadata correctly (copy from original object or replace)
|
||||||
|
_, err = db.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO objects (
|
||||||
|
project_id, bucket_name, object_key, version, stream_id,
|
||||||
|
expires_at, status, segment_count,
|
||||||
|
encryption,
|
||||||
|
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key,
|
||||||
|
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||||
|
zombie_deletion_deadline
|
||||||
|
) VALUES (
|
||||||
|
$1, $2, $3, $4, $5,
|
||||||
|
$6,`+committedStatus+`, $7,
|
||||||
|
$8,
|
||||||
|
$9, $10, $11,
|
||||||
|
$12, $13, $14, null
|
||||||
|
)
|
||||||
|
`, opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
|
||||||
|
originalObject.ExpiresAt, originalObject.SegmentCount,
|
||||||
|
encryptionParameters{&originalObject.Encryption},
|
||||||
|
originalObject.EncryptedMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
|
||||||
|
originalObject.TotalPlainSize, originalObject.TotalEncryptedSize, originalObject.FixedSegmentSize,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
||||||
|
return ErrObjectAlreadyExists.New("")
|
||||||
|
}
|
||||||
|
return Error.New("unable to copy object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: optimize - we should do a bulk insert
|
||||||
|
for index, originalSegment := range segments {
|
||||||
|
|
||||||
|
_, err = db.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO segments (
|
||||||
|
stream_id, position,
|
||||||
|
encrypted_key_nonce, encrypted_key,
|
||||||
|
root_piece_id, -- non-null constraint
|
||||||
|
redundancy,
|
||||||
|
encrypted_size, plain_offset, plain_size,
|
||||||
|
inline_data
|
||||||
|
) VALUES (
|
||||||
|
$1, $2,
|
||||||
|
$3, $4,
|
||||||
|
$5,
|
||||||
|
$6,
|
||||||
|
$7, $8, $9,
|
||||||
|
$10
|
||||||
|
)
|
||||||
|
`, opts.NewStreamID, originalSegment.Position.Encode(),
|
||||||
|
newSegmentKeys.EncryptedKeyNonces[index], newSegmentKeys.EncryptedKeys[index],
|
||||||
|
originalSegment.RootPieceID,
|
||||||
|
redundancyScheme{&originalSegment.Redundancy},
|
||||||
|
originalSegment.EncryptedSize, originalSegment.PlainOffset, originalSegment.PlainSize,
|
||||||
|
originalSegment.InlineData,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("unable to copy segment: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO : we need flatten references
|
||||||
|
_, err = db.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO segment_copies (
|
||||||
|
stream_id, ancestor_stream_id
|
||||||
|
) VALUES (
|
||||||
|
$1, $2
|
||||||
|
)
|
||||||
|
`, opts.NewStreamID, originalObject.StreamID)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("unable to copy object: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mon.Meter("finish_copy_object").Mark(1)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
384
satellite/metabase/copy_object_test.go
Normal file
384
satellite/metabase/copy_object_test.go
Normal file
@ -0,0 +1,384 @@
|
|||||||
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package metabase_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/common/testcontext"
|
||||||
|
"storj.io/common/testrand"
|
||||||
|
"storj.io/storj/satellite/metabase"
|
||||||
|
"storj.io/storj/satellite/metabase/metabasetest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBeginCopyObject(t *testing.T) {
|
||||||
|
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
|
obj := metabasetest.RandObjectStream()
|
||||||
|
|
||||||
|
for _, test := range metabasetest.InvalidObjectLocations(obj.Location()) {
|
||||||
|
test := test
|
||||||
|
t.Run(test.Name, func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
metabasetest.BeginCopyObject{
|
||||||
|
Opts: metabase.BeginCopyObject{
|
||||||
|
ObjectLocation: test.ObjectLocation,
|
||||||
|
},
|
||||||
|
ErrClass: test.ErrClass,
|
||||||
|
ErrText: test.ErrText,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("invalid version", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.BeginCopyObject{
|
||||||
|
Opts: metabase.BeginCopyObject{
|
||||||
|
ObjectLocation: obj.Location(),
|
||||||
|
Version: 0,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "Version invalid: 0",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("begin copy object", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
expectedMetadataNonce := testrand.Nonce()
|
||||||
|
expectedMetadataKey := testrand.Bytes(265)
|
||||||
|
expectedObject := metabasetest.CreateTestObject{
|
||||||
|
CommitObject: &metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
|
EncryptedMetadataNonce: expectedMetadataNonce[:],
|
||||||
|
EncryptedMetadataEncryptedKey: expectedMetadataKey,
|
||||||
|
},
|
||||||
|
}.Run(ctx, t, db, obj, 10)
|
||||||
|
|
||||||
|
var encKeyAndNonces []metabase.EncryptedKeyAndNonce
|
||||||
|
expectedRawSegments := make([]metabase.RawSegment, 10)
|
||||||
|
for i := range expectedRawSegments {
|
||||||
|
expectedRawSegments[i] = metabasetest.DefaultRawSegment(expectedObject.ObjectStream, metabase.SegmentPosition{
|
||||||
|
Index: uint32(i),
|
||||||
|
})
|
||||||
|
expectedRawSegments[i].PlainOffset = int64(i) * int64(expectedRawSegments[i].PlainSize)
|
||||||
|
expectedRawSegments[i].EncryptedSize = 1060
|
||||||
|
|
||||||
|
encKeyAndNonces = append(encKeyAndNonces, metabase.EncryptedKeyAndNonce{
|
||||||
|
EncryptedKeyNonce: expectedRawSegments[i].EncryptedKeyNonce,
|
||||||
|
EncryptedKey: expectedRawSegments[i].EncryptedKey,
|
||||||
|
Position: expectedRawSegments[i].Position,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
metabasetest.BeginCopyObject{
|
||||||
|
Opts: metabase.BeginCopyObject{
|
||||||
|
Version: expectedObject.Version,
|
||||||
|
ObjectLocation: obj.Location(),
|
||||||
|
},
|
||||||
|
Result: metabase.BeginCopyObjectResult{
|
||||||
|
StreamID: expectedObject.StreamID,
|
||||||
|
EncryptedMetadata: expectedObject.EncryptedMetadata,
|
||||||
|
EncryptedMetadataKey: expectedMetadataKey,
|
||||||
|
EncryptedMetadataKeyNonce: expectedMetadataNonce[:],
|
||||||
|
EncryptedKeysNonces: encKeyAndNonces,
|
||||||
|
EncryptionParameters: expectedObject.Encryption,
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
metabase.RawObject(expectedObject),
|
||||||
|
},
|
||||||
|
Segments: expectedRawSegments,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFinishCopyObject(t *testing.T) {
|
||||||
|
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
|
obj := metabasetest.RandObjectStream()
|
||||||
|
newBucketName := "New bucket name"
|
||||||
|
|
||||||
|
for _, test := range metabasetest.InvalidObjectStreams(obj) {
|
||||||
|
test := test
|
||||||
|
t.Run(test.Name, func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: test.ObjectStream,
|
||||||
|
},
|
||||||
|
ErrClass: test.ErrClass,
|
||||||
|
ErrText: test.ErrText,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("invalid NewBucket", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewEncryptedObjectKey: []byte{1, 2, 3},
|
||||||
|
NewEncryptedMetadataKey: []byte{1, 2, 3},
|
||||||
|
NewEncryptedMetadataKeyNonce: []byte{1, 2, 3},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "NewBucket is missing",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("copy to the same StreamID", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
NewStreamID: obj.StreamID,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "StreamIDs are identical",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid NewEncryptedObjectKey", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "NewEncryptedObjectKey is missing",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("copy to the same EncryptedObjectKey", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
NewEncryptedObjectKey: []byte(obj.ObjectKey),
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "source and destination encrypted object key are identical",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid EncryptedMetadataKeyNonce", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewEncryptedObjectKey: []byte{0},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "EncryptedMetadataKeyNonce is missing",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid EncryptedMetadataKey", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewEncryptedObjectKey: []byte{0},
|
||||||
|
NewEncryptedMetadataKeyNonce: []byte{0},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "EncryptedMetadataKey is missing",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("object does not exist", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
newObj := metabasetest.RandObjectStream()
|
||||||
|
|
||||||
|
newEncryptedMetadataKeyNonce := testrand.Nonce()
|
||||||
|
newEncryptedMetadataKey := testrand.Bytes(32)
|
||||||
|
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, 10)
|
||||||
|
newObjectKey := testrand.Bytes(32)
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: newObj,
|
||||||
|
NewSegmentKeys: newEncryptedKeysNonces,
|
||||||
|
NewEncryptedObjectKey: newObjectKey,
|
||||||
|
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
|
||||||
|
NewEncryptedMetadataKey: newEncryptedMetadataKey,
|
||||||
|
},
|
||||||
|
ErrClass: &storj.ErrObjectNotFound,
|
||||||
|
ErrText: "metabase: sql: no rows in result set",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("less amount of segments", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
numberOfSegments := 10
|
||||||
|
newObjectKey := testrand.Bytes(32)
|
||||||
|
|
||||||
|
newObj := metabasetest.CreateTestObject{
|
||||||
|
CommitObject: &metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
|
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||||
|
},
|
||||||
|
}.Run(ctx, t, db, obj, byte(numberOfSegments))
|
||||||
|
|
||||||
|
newEncryptedMetadataKeyNonce := testrand.Nonce()
|
||||||
|
newEncryptedMetadataKey := testrand.Bytes(32)
|
||||||
|
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, newObj.SegmentCount-1)
|
||||||
|
expectedSegments := make([]metabase.RawSegment, newObj.SegmentCount)
|
||||||
|
|
||||||
|
for i := 0; i < int(newObj.SegmentCount-1); i++ {
|
||||||
|
newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{
|
||||||
|
Position: metabase.SegmentPosition{Index: uint32(i)},
|
||||||
|
EncryptedKeyNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedKey: testrand.Bytes(32),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedSegments[i] = metabasetest.DefaultRawSegment(newObj.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||||
|
expectedSegments[i].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce
|
||||||
|
expectedSegments[i].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey
|
||||||
|
expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize)
|
||||||
|
expectedSegments[i].EncryptedSize = int32(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewSegmentKeys: newEncryptedKeysNonces,
|
||||||
|
NewEncryptedObjectKey: newObjectKey,
|
||||||
|
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
|
||||||
|
NewEncryptedMetadataKey: newEncryptedMetadataKey,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "wrong amount of segments keys received (received 10, need 9)",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("wrong segment indexes", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
numberOfSegments := 10
|
||||||
|
newObjectKey := testrand.Bytes(32)
|
||||||
|
|
||||||
|
newObj := metabasetest.CreateTestObject{
|
||||||
|
CommitObject: &metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
|
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||||
|
},
|
||||||
|
}.Run(ctx, t, db, obj, byte(numberOfSegments))
|
||||||
|
|
||||||
|
newEncryptedMetadataKeyNonce := testrand.Nonce()
|
||||||
|
newEncryptedMetadataKey := testrand.Bytes(32)
|
||||||
|
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, newObj.SegmentCount)
|
||||||
|
expectedEncryptedSize := 1060
|
||||||
|
expectedSegments := make([]metabase.RawSegment, newObj.SegmentCount)
|
||||||
|
|
||||||
|
for i := 0; i < int(newObj.SegmentCount); i++ {
|
||||||
|
newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{
|
||||||
|
Position: metabase.SegmentPosition{Index: uint32(i + 5)},
|
||||||
|
EncryptedKeyNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedKey: testrand.Bytes(32),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedSegments[i] = metabasetest.DefaultRawSegment(newObj.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||||
|
expectedSegments[i].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce
|
||||||
|
expectedSegments[i].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey
|
||||||
|
expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize)
|
||||||
|
expectedSegments[i].EncryptedSize = int32(expectedEncryptedSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
metabasetest.FinishCopyObject{
|
||||||
|
Opts: metabase.FinishCopyObject{
|
||||||
|
NewStreamID: testrand.UUID(),
|
||||||
|
NewBucket: newBucketName,
|
||||||
|
ObjectStream: obj,
|
||||||
|
NewSegmentKeys: newEncryptedKeysNonces,
|
||||||
|
NewEncryptedObjectKey: newObjectKey,
|
||||||
|
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
|
||||||
|
NewEncryptedMetadataKey: newEncryptedMetadataKey,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.Error,
|
||||||
|
ErrText: "missing new segment keys for segment 0",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("finish copy object with existing metadata", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
numberOfSegments := 10
|
||||||
|
copyStream := metabasetest.RandObjectStream()
|
||||||
|
|
||||||
|
// make sure segments are ordered as expected when checking database
|
||||||
|
if copyStream.StreamID.Less(obj.StreamID) {
|
||||||
|
obj, copyStream = copyStream, obj
|
||||||
|
}
|
||||||
|
|
||||||
|
originalObj := metabasetest.CreateTestObject{
|
||||||
|
CommitObject: &metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
|
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||||
|
},
|
||||||
|
}.Run(ctx, t, db, obj, byte(numberOfSegments))
|
||||||
|
|
||||||
|
copyObj, expectedSegments := metabasetest.CreateObjectCopy{
|
||||||
|
OriginalObject: originalObj,
|
||||||
|
CopyObjectStream: ©Stream,
|
||||||
|
}.Run(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
metabase.RawObject(originalObj),
|
||||||
|
metabase.RawObject(copyObj),
|
||||||
|
},
|
||||||
|
Segments: expectedSegments,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
// TODO: test with new metadata
|
||||||
|
}
|
@ -255,3 +255,80 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
|||||||
Opts: coOpts,
|
Opts: coOpts,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateObjectCopy is for testing object copy.
|
||||||
|
type CreateObjectCopy struct {
|
||||||
|
OriginalObject metabase.Object
|
||||||
|
FinishObject *metabase.FinishCopyObject
|
||||||
|
CopyObjectStream *metabase.ObjectStream
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run creates the copy.
|
||||||
|
func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB) (metabase.Object, []metabase.RawSegment) {
|
||||||
|
|
||||||
|
var copyStream metabase.ObjectStream
|
||||||
|
if cc.CopyObjectStream != nil {
|
||||||
|
copyStream = *cc.CopyObjectStream
|
||||||
|
} else {
|
||||||
|
copyStream = RandObjectStream()
|
||||||
|
}
|
||||||
|
|
||||||
|
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, cc.OriginalObject.SegmentCount)
|
||||||
|
expectedSegments := make([]metabase.RawSegment, cc.OriginalObject.SegmentCount*2)
|
||||||
|
expectedEncryptedSize := 1060
|
||||||
|
|
||||||
|
for i := 0; i < int(cc.OriginalObject.SegmentCount); i++ {
|
||||||
|
newEncryptedKeysNonces[i] = metabase.EncryptedKeyAndNonce{
|
||||||
|
Position: metabase.SegmentPosition{Index: uint32(i)},
|
||||||
|
EncryptedKeyNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedKey: testrand.Bytes(32),
|
||||||
|
}
|
||||||
|
|
||||||
|
copyIndex := i + int(cc.OriginalObject.SegmentCount)
|
||||||
|
expectedSegments[i] = DefaultRawSegment(cc.OriginalObject.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||||
|
|
||||||
|
// TODO: place this calculation in metabasetest.
|
||||||
|
expectedSegments[i].PlainOffset = int64(int32(i) * expectedSegments[i].PlainSize)
|
||||||
|
// TODO: we should use the same value for encrypted size in both test methods.
|
||||||
|
expectedSegments[i].EncryptedSize = int32(expectedEncryptedSize)
|
||||||
|
|
||||||
|
expectedSegments[copyIndex] = metabase.RawSegment{}
|
||||||
|
expectedSegments[copyIndex].StreamID = copyStream.StreamID
|
||||||
|
expectedSegments[copyIndex].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce
|
||||||
|
expectedSegments[copyIndex].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey
|
||||||
|
expectedSegments[copyIndex].EncryptedSize = expectedSegments[i].EncryptedSize
|
||||||
|
expectedSegments[copyIndex].Position = expectedSegments[i].Position
|
||||||
|
expectedSegments[copyIndex].RootPieceID = expectedSegments[i].RootPieceID
|
||||||
|
expectedSegments[copyIndex].Redundancy = expectedSegments[i].Redundancy
|
||||||
|
expectedSegments[copyIndex].PlainSize = expectedSegments[i].PlainSize
|
||||||
|
expectedSegments[copyIndex].PlainOffset = expectedSegments[i].PlainOffset
|
||||||
|
expectedSegments[copyIndex].CreatedAt = time.Now().UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := cc.FinishObject
|
||||||
|
if opts == nil {
|
||||||
|
opts = &metabase.FinishCopyObject{
|
||||||
|
NewStreamID: copyStream.StreamID,
|
||||||
|
NewBucket: copyStream.BucketName,
|
||||||
|
ObjectStream: cc.OriginalObject.ObjectStream,
|
||||||
|
NewSegmentKeys: newEncryptedKeysNonces,
|
||||||
|
NewEncryptedObjectKey: []byte(copyStream.ObjectKey),
|
||||||
|
NewEncryptedMetadataKeyNonce: testrand.Nonce().Bytes(),
|
||||||
|
NewEncryptedMetadataKey: testrand.Bytes(32),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FinishCopyObject{
|
||||||
|
Opts: *opts,
|
||||||
|
ErrText: "",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
copyObj := cc.OriginalObject
|
||||||
|
copyObj.StreamID = copyStream.StreamID
|
||||||
|
copyObj.ObjectKey = copyStream.ObjectKey
|
||||||
|
copyObj.EncryptedMetadataEncryptedKey = opts.NewEncryptedMetadataKey
|
||||||
|
copyObj.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce
|
||||||
|
copyObj.BucketName = copyStream.BucketName
|
||||||
|
copyObj.ZombieDeletionDeadline = nil
|
||||||
|
|
||||||
|
return copyObj, expectedSegments
|
||||||
|
}
|
||||||
|
@ -655,6 +655,36 @@ func (step FinishMoveObject) Check(ctx *testcontext.Context, t testing.TB, db *m
|
|||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BeginCopyObject is for testing metabase.BeginCopyObject.
|
||||||
|
type BeginCopyObject struct {
|
||||||
|
Opts metabase.BeginCopyObject
|
||||||
|
Result metabase.BeginCopyObjectResult
|
||||||
|
ErrClass *errs.Class
|
||||||
|
ErrText string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check runs the test.
|
||||||
|
func (step BeginCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||||
|
result, err := db.BeginCopyObject(ctx, step.Opts)
|
||||||
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
|
|
||||||
|
diff := cmp.Diff(step.Result, result)
|
||||||
|
require.Zero(t, diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FinishCopyObject is for testing metabase.FinishCopyObject.
|
||||||
|
type FinishCopyObject struct {
|
||||||
|
Opts metabase.FinishCopyObject
|
||||||
|
ErrClass *errs.Class
|
||||||
|
ErrText string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check runs the test.
|
||||||
|
func (step FinishCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||||
|
err := db.FinishCopyObject(ctx, step.Opts)
|
||||||
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
|
}
|
||||||
|
|
||||||
// GetProjectSegmentCount is for testing metabase.GetProjectSegmentCount.
|
// GetProjectSegmentCount is for testing metabase.GetProjectSegmentCount.
|
||||||
type GetProjectSegmentCount struct {
|
type GetProjectSegmentCount struct {
|
||||||
Opts metabase.GetProjectSegmentCount
|
Opts metabase.GetProjectSegmentCount
|
||||||
|
@ -97,6 +97,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
|
|||||||
_, err = db.db.ExecContext(ctx, `
|
_, err = db.db.ExecContext(ctx, `
|
||||||
DELETE FROM objects;
|
DELETE FROM objects;
|
||||||
DELETE FROM segments;
|
DELETE FROM segments;
|
||||||
|
DELETE FROM segment_copies;
|
||||||
DELETE FROM node_aliases;
|
DELETE FROM node_aliases;
|
||||||
SELECT setval('node_alias_seq', 1, false);
|
SELECT setval('node_alias_seq', 1, false);
|
||||||
`)
|
`)
|
||||||
@ -178,7 +179,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
|
|||||||
plain_offset, plain_size,
|
plain_offset, plain_size,
|
||||||
encrypted_etag,
|
encrypted_etag,
|
||||||
redundancy,
|
redundancy,
|
||||||
inline_data, remote_alias_pieces,
|
inline_data, remote_alias_pieces,
|
||||||
placement
|
placement
|
||||||
FROM segments
|
FROM segments
|
||||||
ORDER BY stream_id ASC, position ASC
|
ORDER BY stream_id ASC, position ASC
|
||||||
|
Loading…
Reference in New Issue
Block a user