satellite/metabase: don't delete pieces when deleting ancestor object
Fixes https://github.com/storj/storj/issues/4613 Change-Id: I3d6217a618a2a685256471f0394a143a323ac044
This commit is contained in:
parent
f6b4d522be
commit
0bde845a17
@ -86,8 +86,9 @@ func TestGarbageCollection(t *testing.T) {
|
||||
require.NotZero(t, keptPieceID)
|
||||
|
||||
// Delete one object from metainfo service on satellite
|
||||
_, err = satellite.Metabase.DB.DeleteObjectsAllVersions(ctx, metabase.DeleteObjectsAllVersions{
|
||||
Locations: []metabase.ObjectLocation{objectLocationToDelete},
|
||||
_, err = satellite.Metabase.DB.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: objectLocationToDelete,
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
@ -112,7 +113,8 @@ SELECT
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
|
||||
NULL
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
|
||||
|
||||
@ -168,7 +170,7 @@ promoted_ancestor AS (
|
||||
LIMIT 1
|
||||
)
|
||||
ORDER BY stream_id
|
||||
LIMIT 1
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
stream_id
|
||||
@ -203,7 +205,8 @@ SELECT
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
|
||||
(SELECT stream_id FROM promoted_ancestor)
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments
|
||||
ON deleted_objects.stream_id = deleted_segments.stream_id`
|
||||
@ -287,7 +290,8 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
|
||||
NULL
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID))(func(rows tagsql.Rows) error {
|
||||
@ -313,6 +317,10 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
||||
func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteObjectAnyStatusAllVersions) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if db.config.ServerSideCopy {
|
||||
return DeleteObjectResult{}, errs.New("method cannot be used when server-side copy is enabled")
|
||||
}
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
@ -343,7 +351,8 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
|
||||
NULL
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
||||
@ -369,6 +378,10 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
|
||||
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if db.config.ServerSideCopy {
|
||||
return DeleteObjectResult{}, errs.New("method cannot be used when server-side copy is enabled")
|
||||
}
|
||||
|
||||
if len(opts.Locations) == 0 {
|
||||
// nothing to delete, no error
|
||||
return DeleteObjectResult{}, nil
|
||||
@ -452,7 +465,7 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
|
||||
var aliasPieces AliasPieces
|
||||
|
||||
for rows.Next() {
|
||||
|
||||
var promotedAncestor *uuid.UUID
|
||||
object.ProjectID = location.ProjectID
|
||||
object.BucketName = location.BucketName
|
||||
object.ObjectKey = location.ObjectKey
|
||||
@ -462,14 +475,18 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
|
||||
&object.Status, &object.SegmentCount,
|
||||
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
|
||||
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
|
||||
encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces)
|
||||
encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces,
|
||||
&promotedAncestor,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, Error.New("unable to delete object: %w", err)
|
||||
}
|
||||
if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID {
|
||||
objects = append(objects, object)
|
||||
}
|
||||
if rootPieceID != nil {
|
||||
// not nil promotedAncestor means that while delete new ancestor was promoted and
|
||||
// we should not delete pieces because we had copies and now one copy become ancestor
|
||||
if rootPieceID != nil && promotedAncestor == nil {
|
||||
segment.RootPieceID = *rootPieceID
|
||||
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||
if err != nil {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
@ -18,6 +19,13 @@ import (
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
)
|
||||
|
||||
var noServerSideCopyConfig = metabase.Config{
|
||||
ApplicationName: "satellite-test",
|
||||
MinPartSize: 5 * memory.MiB,
|
||||
MaxNumberOfParts: 1000,
|
||||
ServerSideCopy: false,
|
||||
}
|
||||
|
||||
func TestDeletePendingObject(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
@ -459,7 +467,7 @@ func TestDeleteObjectExactVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteObjectAnyStatusAllVersions(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
location := obj.Location()
|
||||
@ -646,7 +654,7 @@ func TestDeleteObjectAnyStatusAllVersions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteObjectsAllVersions(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
location := obj.Location()
|
||||
@ -982,11 +990,16 @@ func TestDeleteCopy(t *testing.T) {
|
||||
},
|
||||
}.Normalize().Check(ctx, t, db)
|
||||
|
||||
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
Version: copyObj.Version,
|
||||
ObjectLocation: copyObj.Location(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: copyObj.Location(),
|
||||
Version: copyObj.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{copyObj},
|
||||
// no segments returned as we deleted copy
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// Verify that we are back at the original single object
|
||||
metabasetest.Verify{
|
||||
@ -1018,11 +1031,16 @@ func TestDeleteCopy(t *testing.T) {
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
Version: copyObject1.Version,
|
||||
ObjectLocation: copyObject1.Location(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: copyObject1.Location(),
|
||||
Version: copyObject1.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{copyObject1},
|
||||
// no segments returned as we deleted copy
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// Verify that only one of the copies is deleted
|
||||
metabasetest.Verify{
|
||||
@ -1058,11 +1076,17 @@ func TestDeleteCopy(t *testing.T) {
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
Version: originalObj.Version,
|
||||
ObjectLocation: originalObj.Location(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: originalObj.Location(),
|
||||
Version: originalObj.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{originalObj},
|
||||
// no segments returned as we deleted ancestor
|
||||
// and we moved pieces to one of copies
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// verify that the copy is left
|
||||
metabasetest.Verify{
|
||||
|
Loading…
Reference in New Issue
Block a user