satellite/metabase: handle target pending/committed objects while move
Before we introduced objects versions internally move operation was always failing when under target location object exists. But then we had only single version 1 all the time. With versions different than 1 we need to check all existing objects under target location. To be backward compatible with our API new logic looks like this: * if there is no object under target location use source object version as target version * if there are only pending objects find first free (highest) version which could be used to move object there * if there is committed object under target location reject move operation Fixes https://github.com/storj/storj/issues/5403 Change-Id: I717f3e7c42470b406287d6ec335f6f057d3fc3b5
This commit is contained in:
parent
2feb49afc3
commit
77afdae741
@ -154,10 +154,53 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
}
|
||||
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
targetVersion := opts.Version
|
||||
|
||||
if db.config.MultipleVersions {
|
||||
useNewVersion := false
|
||||
highestVersion := Version(0)
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
SELECT version, status
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3
|
||||
ORDER BY version ASC
|
||||
`, opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var status ObjectStatus
|
||||
var version Version
|
||||
|
||||
err = rows.Scan(&version, &status)
|
||||
if err != nil {
|
||||
return Error.New("failed to scan objects: %w", err)
|
||||
}
|
||||
|
||||
if status == Committed {
|
||||
return ErrObjectAlreadyExists.New("")
|
||||
} else if status == Pending && version == opts.Version {
|
||||
useNewVersion = true
|
||||
}
|
||||
highestVersion = version
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if useNewVersion {
|
||||
targetVersion = highestVersion + 1
|
||||
}
|
||||
}
|
||||
|
||||
updateObjectsQuery := `
|
||||
UPDATE objects SET
|
||||
bucket_name = $1,
|
||||
object_key = $2,
|
||||
version = $9,
|
||||
encrypted_metadata_encrypted_key = CASE WHEN objects.encrypted_metadata IS NOT NULL
|
||||
THEN $3
|
||||
ELSE objects.encrypted_metadata_encrypted_key
|
||||
@ -172,7 +215,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
object_key = $7 AND
|
||||
version = $8
|
||||
RETURNING
|
||||
segment_count,
|
||||
segment_count,
|
||||
objects.encrypted_metadata IS NOT NULL AND LENGTH(objects.encrypted_metadata) > 0 AS has_metadata,
|
||||
stream_id
|
||||
`
|
||||
@ -181,7 +224,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
var hasMetadata bool
|
||||
var streamID uuid.UUID
|
||||
|
||||
row := tx.QueryRowContext(ctx, updateObjectsQuery, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey, opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version)
|
||||
row := tx.QueryRowContext(ctx, updateObjectsQuery, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey, opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, targetVersion)
|
||||
if err = row.Scan(&segmentsCount, &hasMetadata, &streamID); err != nil {
|
||||
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
||||
return Error.Wrap(ErrObjectAlreadyExists.New(""))
|
||||
|
@ -526,5 +526,61 @@ func TestFinishMoveObject(t *testing.T) {
|
||||
Segments: expectedSegments,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func TestFinishMoveObject_MultipleVersions(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
db.TestingEnableMultipleVersions(true)
|
||||
|
||||
t.Run("finish move object - different versions reject", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
committedTargetStreams := []metabase.ObjectStream{}
|
||||
obj := metabasetest.RandObjectStream()
|
||||
for _, version := range []metabase.Version{1, 2} {
|
||||
obj.Version = version
|
||||
object, _ := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 1)
|
||||
committedTargetStreams = append(committedTargetStreams, object.ObjectStream)
|
||||
}
|
||||
|
||||
sourceStream := metabasetest.RandObjectStream()
|
||||
sourceStream.ProjectID = obj.ProjectID
|
||||
_, _ = metabasetest.CreateTestObject{}.Run(ctx, t, db, sourceStream, 1)
|
||||
|
||||
// it's not possible to move if under location were we have committed version
|
||||
for _, targetStream := range committedTargetStreams {
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
ObjectStream: sourceStream,
|
||||
NewBucket: targetStream.BucketName,
|
||||
NewEncryptedObjectKey: []byte(targetStream.ObjectKey),
|
||||
},
|
||||
ErrClass: &metabase.ErrObjectAlreadyExists,
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("finish move object - target pending object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
metabasetest.CreatePendingObject(ctx, t, db, obj, 1)
|
||||
|
||||
sourceStream := metabasetest.RandObjectStream()
|
||||
sourceStream.ProjectID = obj.ProjectID
|
||||
_, _ = metabasetest.CreateTestObject{}.Run(ctx, t, db, sourceStream, 0)
|
||||
|
||||
// it's possible to move if under location were we have only pending version
|
||||
metabasetest.FinishMoveObject{
|
||||
Opts: metabase.FinishMoveObject{
|
||||
ObjectStream: sourceStream,
|
||||
NewBucket: obj.BucketName,
|
||||
NewEncryptedObjectKey: []byte(obj.ObjectKey),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -2067,6 +2067,12 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
||||
require.Equal(t, "object", iterator.Item().Key)
|
||||
require.NoError(t, iterator.Err())
|
||||
|
||||
// upload multipleversions/object once again as we just moved it
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
checkDownload("object", expectedData)
|
||||
|
||||
{ // server side copy
|
||||
_, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil)
|
||||
require.NoError(t, err)
|
||||
@ -2105,6 +2111,9 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
||||
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = project.DeleteObject(ctx, "multipleversions", "object_moved")
|
||||
require.NoError(t, err)
|
||||
|
||||
iterator = project.ListObjects(ctx, "multipleversions", nil)
|
||||
require.False(t, iterator.Next())
|
||||
require.NoError(t, iterator.Err())
|
||||
@ -2271,3 +2280,34 @@ func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
||||
}, items)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEndpoint_Object_MoveObject_MultipleVersions(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.MultipleVersions = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
expectedDataA := testrand.Bytes(7 * memory.KiB)
|
||||
|
||||
// upload objectA twice to have to have version different than 1
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataA)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataA)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectB", testrand.Bytes(1*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
// move is not possible because we have committed object under target location
|
||||
err = project.MoveObject(ctx, "multipleversions", "objectA", "multipleversions", "objectB", nil)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user