satellite/metabase: adjust FinishMoveObject for versioning
Change-Id: Ib63c0d953f7b9f52a456b275f61cac166a93de12
This commit is contained in:
parent
a1a8c258d5
commit
080f58acfe
@ -8,12 +8,9 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
|
||||
pgxerrcode "github.com/jackc/pgerrcode"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/dbutil/pgutil/pgerrcode"
|
||||
"storj.io/private/dbutil/txutil"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
@ -121,12 +118,19 @@ func (db *DB) beginMoveCopyObject(ctx context.Context, location ObjectLocation,
|
||||
// FinishMoveObject holds all data needed to finish object move.
|
||||
type FinishMoveObject struct {
|
||||
ObjectStream
|
||||
|
||||
NewBucket string
|
||||
NewSegmentKeys []EncryptedKeyAndNonce
|
||||
NewEncryptedObjectKey []byte
|
||||
NewEncryptedObjectKey ObjectKey
|
||||
// Optional. Required if object has metadata.
|
||||
NewEncryptedMetadataKeyNonce storj.Nonce
|
||||
NewEncryptedMetadataKey []byte
|
||||
|
||||
// NewDisallowDelete indicates whether the user is allowed to delete an existing unversioned object.
|
||||
NewDisallowDelete bool
|
||||
|
||||
// NewVersioned indicates that the object allows multiple versions.
|
||||
NewVersioned bool
|
||||
}
|
||||
|
||||
// Verify verifies metabase.FinishMoveObject data.
|
||||
@ -154,79 +158,72 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
}
|
||||
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
targetVersion := opts.Version
|
||||
var highestVersion Version
|
||||
|
||||
useNewVersion := false
|
||||
highestVersion := Version(0)
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
SELECT version, status
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3
|
||||
ORDER BY version ASC
|
||||
`, opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var status ObjectStatus
|
||||
var version Version
|
||||
|
||||
err = rows.Scan(&version, &status)
|
||||
if err != nil {
|
||||
return Error.New("failed to scan objects: %w", err)
|
||||
}
|
||||
|
||||
if status == CommittedUnversioned {
|
||||
return ErrObjectAlreadyExists.New("")
|
||||
} else if status == Pending && version == opts.Version {
|
||||
useNewVersion = true
|
||||
}
|
||||
highestVersion = version
|
||||
if !opts.NewVersioned {
|
||||
// TODO(ver): this logic can probably merged into update query
|
||||
//
|
||||
// Note, we are prematurely deleting the object without permissions
|
||||
// and then rolling the action back, if we were not allowed to.
|
||||
deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
|
||||
ProjectID: opts.ProjectID,
|
||||
BucketName: opts.NewBucket,
|
||||
ObjectKey: opts.NewEncryptedObjectKey,
|
||||
}, tx)
|
||||
if err != nil {
|
||||
return Error.New("unable to delete object at target location: %w", err)
|
||||
}
|
||||
if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete {
|
||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
highestVersion = deleted.MaxVersion
|
||||
} else {
|
||||
highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{
|
||||
ProjectID: opts.ProjectID,
|
||||
BucketName: opts.NewBucket,
|
||||
ObjectKey: opts.NewEncryptedObjectKey,
|
||||
}, tx)
|
||||
if err != nil {
|
||||
return Error.New("unable to query highest version: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if useNewVersion {
|
||||
targetVersion = highestVersion + 1
|
||||
}
|
||||
|
||||
updateObjectsQuery := `
|
||||
UPDATE objects SET
|
||||
bucket_name = $1,
|
||||
object_key = $2,
|
||||
version = $9,
|
||||
encrypted_metadata_encrypted_key = CASE WHEN objects.encrypted_metadata IS NOT NULL
|
||||
THEN $3
|
||||
ELSE objects.encrypted_metadata_encrypted_key
|
||||
END,
|
||||
encrypted_metadata_nonce = CASE WHEN objects.encrypted_metadata IS NOT NULL
|
||||
THEN $4
|
||||
ELSE objects.encrypted_metadata_nonce
|
||||
END
|
||||
WHERE
|
||||
project_id = $5 AND
|
||||
bucket_name = $6 AND
|
||||
object_key = $7 AND
|
||||
version = $8
|
||||
RETURNING
|
||||
segment_count,
|
||||
objects.encrypted_metadata IS NOT NULL AND LENGTH(objects.encrypted_metadata) > 0 AS has_metadata,
|
||||
stream_id
|
||||
`
|
||||
|
||||
var segmentsCount int
|
||||
var hasMetadata bool
|
||||
var streamID uuid.UUID
|
||||
|
||||
row := tx.QueryRowContext(ctx, updateObjectsQuery, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey, opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, targetVersion)
|
||||
if err = row.Scan(&segmentsCount, &hasMetadata, &streamID); err != nil {
|
||||
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
||||
return Error.Wrap(ErrObjectAlreadyExists.New(""))
|
||||
} else if errors.Is(err, sql.ErrNoRows) {
|
||||
newStatus := committedWhereVersioned(opts.NewVersioned)
|
||||
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
UPDATE objects SET
|
||||
bucket_name = $1,
|
||||
object_key = $2,
|
||||
version = $10,
|
||||
status = $9,
|
||||
encrypted_metadata_encrypted_key =
|
||||
CASE WHEN objects.encrypted_metadata IS NOT NULL
|
||||
THEN $3
|
||||
ELSE objects.encrypted_metadata_encrypted_key
|
||||
END,
|
||||
encrypted_metadata_nonce =
|
||||
CASE WHEN objects.encrypted_metadata IS NOT NULL
|
||||
THEN $4
|
||||
ELSE objects.encrypted_metadata_nonce
|
||||
END
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) = ($5, $6, $7, $8)
|
||||
RETURNING
|
||||
segment_count,
|
||||
objects.encrypted_metadata IS NOT NULL AND LENGTH(objects.encrypted_metadata) > 0 AS has_metadata,
|
||||
stream_id
|
||||
`, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey,
|
||||
opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName),
|
||||
opts.ObjectKey, opts.Version, newStatus, highestVersion+1).
|
||||
Scan(&segmentsCount, &hasMetadata, &streamID)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrObjectNotFound.New("object not found")
|
||||
}
|
||||
return Error.New("unable to update object: %w", err)
|
||||
@ -259,14 +256,14 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
}
|
||||
|
||||
updateResult, err := tx.ExecContext(ctx, `
|
||||
UPDATE segments SET
|
||||
encrypted_key_nonce = P.encrypted_key_nonce,
|
||||
encrypted_key = P.encrypted_key
|
||||
FROM (SELECT unnest($2::INT8[]), unnest($3::BYTEA[]), unnest($4::BYTEA[])) as P(position, encrypted_key_nonce, encrypted_key)
|
||||
WHERE
|
||||
stream_id = $1 AND
|
||||
segments.position = P.position
|
||||
`, opts.StreamID, pgutil.Int8Array(newSegmentKeys.Positions), pgutil.ByteaArray(newSegmentKeys.EncryptedKeyNonces), pgutil.ByteaArray(newSegmentKeys.EncryptedKeys))
|
||||
UPDATE segments SET
|
||||
encrypted_key_nonce = P.encrypted_key_nonce,
|
||||
encrypted_key = P.encrypted_key
|
||||
FROM (SELECT unnest($2::INT8[]), unnest($3::BYTEA[]), unnest($4::BYTEA[])) as P(position, encrypted_key_nonce, encrypted_key)
|
||||
WHERE
|
||||
stream_id = $1 AND
|
||||
segments.position = P.position
|
||||
`, opts.StreamID, pgutil.Int8Array(newSegmentKeys.Positions), pgutil.ByteaArray(newSegmentKeys.EncryptedKeyNonces), pgutil.ByteaArray(newSegmentKeys.EncryptedKeys))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package metabase_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
@ -111,7 +112,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
ObjectStream: obj,
|
||||
NewEncryptedObjectKey: []byte{1, 2, 3},
|
||||
NewEncryptedObjectKey: metabase.ObjectKey("\x01\x02\x03"),
|
||||
NewEncryptedMetadataKey: []byte{1, 2, 3},
|
||||
NewEncryptedMetadataKeyNonce: testrand.Nonce(),
|
||||
},
|
||||
@ -141,7 +142,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
newEncryptedMetadataKey := testrand.Bytes(32)
|
||||
|
||||
newObj, segments := metabasetest.CreateTestObject{
|
||||
@ -181,7 +182,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
|
||||
newObj, segments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
@ -224,7 +225,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
Opts: metabase.FinishMoveObject{
|
||||
NewBucket: newBucketName,
|
||||
ObjectStream: obj,
|
||||
NewEncryptedObjectKey: []byte{0},
|
||||
NewEncryptedObjectKey: metabase.ObjectKey("\x00"),
|
||||
},
|
||||
// validation pass without EncryptedMetadataKey and EncryptedMetadataKeyNonce
|
||||
ErrClass: &metabase.ErrObjectNotFound,
|
||||
@ -234,25 +235,79 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("object already exists", func(t *testing.T) {
|
||||
t.Run("existing object is overwritten", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
moveObjStream := metabasetest.RandObjectStream()
|
||||
metabasetest.CreateObject(ctx, t, db, moveObjStream, 0)
|
||||
initialObject := metabasetest.CreateObject(ctx, t, db, moveObjStream, 0)
|
||||
|
||||
conflictObjStream := metabasetest.RandObjectStream()
|
||||
conflictObjStream.ProjectID = moveObjStream.ProjectID
|
||||
metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0)
|
||||
|
||||
newNonce := testrand.Nonce()
|
||||
newMetadataKey := testrand.Bytes(265)
|
||||
|
||||
now := time.Now()
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
NewBucket: conflictObjStream.BucketName,
|
||||
ObjectStream: moveObjStream,
|
||||
NewEncryptedObjectKey: []byte(conflictObjStream.ObjectKey),
|
||||
NewEncryptedMetadataKeyNonce: testrand.Nonce(),
|
||||
NewEncryptedMetadataKey: testrand.Bytes(265),
|
||||
NewEncryptedObjectKey: conflictObjStream.ObjectKey,
|
||||
NewEncryptedMetadataKeyNonce: newNonce,
|
||||
NewEncryptedMetadataKey: newMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: conflictObjStream.ProjectID,
|
||||
BucketName: conflictObjStream.BucketName,
|
||||
ObjectKey: conflictObjStream.ObjectKey,
|
||||
StreamID: initialObject.StreamID,
|
||||
Version: conflictObjStream.Version + 1,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
Encryption: initialObject.Encryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("existing object is not overwritten, permission denied", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
moveObjStream := metabasetest.RandObjectStream()
|
||||
initialObject := metabasetest.CreateObject(ctx, t, db, moveObjStream, 0)
|
||||
|
||||
conflictObjStream := metabasetest.RandObjectStream()
|
||||
conflictObjStream.ProjectID = moveObjStream.ProjectID
|
||||
conflictObject := metabasetest.CreateObject(ctx, t, db, conflictObjStream, 0)
|
||||
|
||||
newNonce := testrand.Nonce()
|
||||
newMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
NewBucket: conflictObjStream.BucketName,
|
||||
ObjectStream: moveObjStream,
|
||||
NewEncryptedObjectKey: conflictObjStream.ObjectKey,
|
||||
NewEncryptedMetadataKeyNonce: newNonce,
|
||||
NewEncryptedMetadataKey: newMetadataKey,
|
||||
|
||||
NewDisallowDelete: true,
|
||||
},
|
||||
ErrClass: &metabase.ErrPermissionDenied,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(conflictObject),
|
||||
metabase.RawObject(initialObject),
|
||||
},
|
||||
ErrClass: &metabase.ErrObjectAlreadyExists,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
@ -264,7 +319,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
newEncryptedMetadataKeyNonce := testrand.Nonce()
|
||||
newEncryptedMetadataKey := testrand.Bytes(32)
|
||||
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, 10)
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
@ -286,7 +341,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
|
||||
newObj, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
@ -335,7 +390,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
|
||||
newObj, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
@ -385,7 +440,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
t.Run("source object changed", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
newObj, _ := metabasetest.CreateTestObject{
|
||||
newObj, newSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
OverrideEncryptedMetadata: true,
|
||||
@ -409,13 +464,18 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
metabasetest.RandEncryptedKeyAndNonce(0),
|
||||
metabasetest.RandEncryptedKeyAndNonce(1),
|
||||
},
|
||||
NewEncryptedObjectKey: testrand.Bytes(32),
|
||||
NewEncryptedObjectKey: metabasetest.RandObjectKey(),
|
||||
NewEncryptedMetadataKeyNonce: testrand.Nonce(),
|
||||
NewEncryptedMetadataKey: testrand.Bytes(32),
|
||||
},
|
||||
ErrClass: &metabase.ErrObjectNotFound,
|
||||
ErrText: "object was changed during move",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{metabase.RawObject(newObj)},
|
||||
Segments: metabasetest.SegmentsToRaw(newSegments),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("finish move object", func(t *testing.T) {
|
||||
@ -448,7 +508,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
expectedRawSegments = append(expectedRawSegments, metabase.RawSegment(segment))
|
||||
}
|
||||
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
NewBucket: newBucketName,
|
||||
@ -459,8 +519,9 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
ErrText: "",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
object.ObjectKey = metabase.ObjectKey(newObjectKey)
|
||||
object.ObjectKey = newObjectKey
|
||||
object.BucketName = newBucketName
|
||||
object.Version = 1 // there are no overwritten object, hence it should start from 1
|
||||
|
||||
expectedRawObjects = append(expectedRawObjects, metabase.RawObject(object))
|
||||
}
|
||||
@ -475,7 +536,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
newObjectKey := testrand.Bytes(32)
|
||||
newObjectKey := metabasetest.RandObjectKey()
|
||||
|
||||
newObj, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
@ -516,8 +577,9 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
ErrText: "",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
newObj.ObjectKey = metabase.ObjectKey(newObjectKey)
|
||||
newObj.ObjectKey = newObjectKey
|
||||
newObj.BucketName = newBucketName
|
||||
newObj.Version = 1
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
@ -527,7 +589,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("finish move object - different versions reject", func(t *testing.T) {
|
||||
t.Run("finish move object - different versions reject when NewDisallowDelete", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
committedTargetStreams := []metabase.ObjectStream{}
|
||||
@ -548,9 +610,11 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
Opts: metabase.FinishMoveObject{
|
||||
ObjectStream: sourceStream,
|
||||
NewBucket: targetStream.BucketName,
|
||||
NewEncryptedObjectKey: []byte(targetStream.ObjectKey),
|
||||
NewEncryptedObjectKey: targetStream.ObjectKey,
|
||||
|
||||
NewDisallowDelete: true,
|
||||
},
|
||||
ErrClass: &metabase.ErrObjectAlreadyExists,
|
||||
ErrClass: &metabase.ErrPermissionDenied,
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
})
|
||||
@ -571,7 +635,7 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
Opts: metabase.FinishMoveObject{
|
||||
ObjectStream: sourceStream,
|
||||
NewBucket: obj.BucketName,
|
||||
NewEncryptedObjectKey: []byte(obj.ObjectKey),
|
||||
NewEncryptedObjectKey: obj.ObjectKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
@ -2032,9 +2032,12 @@ func (endpoint *Endpoint) FinishMoveObject(ctx context.Context, req *pb.ObjectFi
|
||||
},
|
||||
NewSegmentKeys: protobufkeysToMetabase(req.NewSegmentKeys),
|
||||
NewBucket: string(req.NewBucket),
|
||||
NewEncryptedObjectKey: req.NewEncryptedObjectKey,
|
||||
NewEncryptedObjectKey: metabase.ObjectKey(req.NewEncryptedObjectKey),
|
||||
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce,
|
||||
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
|
||||
|
||||
// TODO(ver): currently we disallow deletion, to not change behaviour.
|
||||
NewDisallowDelete: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
|
Loading…
Reference in New Issue
Block a user