satellite/metabase: added method DeleteObjectLastCommitted
method similar to metabase.DeleteObjectExactVersion which will delete last committed object Closes https://github.com/storj/storj/issues/4872 Change-Id: Ia9f8c227dc59575bf8ed297886b35536097028b4
This commit is contained in:
parent
2ae1db53b6
commit
df5b0d0044
@ -137,6 +137,44 @@ SELECT
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
|
||||
|
||||
var deleteObjectLastCommittedWithoutCopyFeatureSQL = `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version IN (SELECT version FROM objects WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = ` + committedStatus + ` AND
|
||||
(expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY version DESC
|
||||
)
|
||||
RETURNING
|
||||
version, stream_id,
|
||||
created_at, expires_at,
|
||||
status, segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
|
||||
)
|
||||
SELECT
|
||||
deleted_objects.version, deleted_objects.stream_id,
|
||||
deleted_objects.created_at, deleted_objects.expires_at,
|
||||
deleted_objects.status, deleted_objects.segment_count,
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
|
||||
|
||||
// TODO: remove comments with regex.
|
||||
var deleteBucketObjectsWithCopyFeatureSQL = `
|
||||
WITH deleted_objects AS (
|
||||
@ -208,6 +246,22 @@ WHERE
|
||||
version = $4
|
||||
`
|
||||
|
||||
var deleteObjectLastCommittedSubSQL = `
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version IN (SELECT version FROM objects WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = ` + committedStatus + ` AND
|
||||
(expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY version DESC
|
||||
)
|
||||
`
|
||||
|
||||
var deleteObjectExactVersionWithCopyFeatureSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectExactVersionSubSQL,
|
||||
@ -238,6 +292,36 @@ var deleteObjectExactVersionWithCopyFeatureSQL = fmt.Sprintf(
|
||||
deleted_segments.repaired_at`,
|
||||
)
|
||||
|
||||
var deleteObjectLastCommittedWithCopyFeatureSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectLastCommittedSubSQL,
|
||||
`,version,
|
||||
created_at,
|
||||
expires_at,
|
||||
status,
|
||||
segment_count,
|
||||
encrypted_metadata_nonce,
|
||||
encrypted_metadata,
|
||||
encrypted_metadata_encrypted_key,
|
||||
total_plain_size,
|
||||
total_encrypted_size,
|
||||
fixed_segment_size,
|
||||
encryption`,
|
||||
`,deleted_objects.version,
|
||||
deleted_objects.created_at,
|
||||
deleted_objects.expires_at,
|
||||
deleted_objects.status,
|
||||
deleted_objects.segment_count,
|
||||
deleted_objects.encrypted_metadata_nonce,
|
||||
deleted_objects.encrypted_metadata,
|
||||
deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size,
|
||||
deleted_objects.total_encrypted_size,
|
||||
deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.repaired_at`,
|
||||
)
|
||||
|
||||
var deleteFromSegmentCopies = `
|
||||
DELETE FROM segment_copies WHERE segment_copies.stream_id = $1
|
||||
`
|
||||
@ -789,3 +873,101 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows)
|
||||
|
||||
return objects, segments, nil
|
||||
}
|
||||
|
||||
// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
|
||||
type DeleteObjectLastCommitted struct {
|
||||
ObjectLocation
|
||||
}
|
||||
|
||||
// Verify delete object last committed fields.
|
||||
func (obj *DeleteObjectLastCommitted) Verify() error {
|
||||
return obj.ObjectLocation.Verify()
|
||||
}
|
||||
|
||||
// DeleteObjectLastCommitted deletes an object last committed version.
|
||||
//
|
||||
// Result will contain only those segments which needs to be deleted
|
||||
// from storage nodes. If object is an ancestor for copied object its
|
||||
// segments pieces cannot be deleted because copy still needs it.
|
||||
func (db *DB) DeleteObjectLastCommitted(
|
||||
ctx context.Context, opts DeleteObjectLastCommitted,
|
||||
) (result DeleteObjectResult, err error) {
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
result, err = db.deleteObjectLastCommitted(ctx, opts, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return result, err
|
||||
}
|
||||
|
||||
// implementation of DB.DeleteObjectLastCommitted for re-use internally in metabase package.
|
||||
func (db *DB) deleteObjectLastCommitted(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
if db.config.ServerSideCopy {
|
||||
objects, err := db.deleteObjectLastCommittedServerSideCopy(ctx, opts, tx)
|
||||
if err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
result.Objects = append(result.Objects, object.Object)
|
||||
|
||||
// if object is ancestor for copied object we cannot delete its
|
||||
// segments pieces from storage nodes so we are not returning it
|
||||
// as an object deletion result
|
||||
if object.PromotedAncestor != nil {
|
||||
continue
|
||||
}
|
||||
for _, segment := range object.Segments {
|
||||
result.Segments = append(result.Segments, DeletedSegmentInfo{
|
||||
RootPieceID: segment.RootPieceID,
|
||||
Pieces: segment.Pieces,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err = withRows(
|
||||
tx.QueryContext(ctx, deleteObjectLastCommittedWithoutCopyFeatureSQL,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
|
||||
return err
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteObjectLastCommittedServerSideCopy(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (objects []deletedObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = withRows(
|
||||
tx.QueryContext(ctx, deleteObjectLastCommittedWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.promoteNewAncestors(ctx, tx, objects)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
@ -1191,3 +1191,200 @@ func TestDeleteCopy(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteObjectLastCommitted(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
location := obj.Location()
|
||||
|
||||
for _, test := range metabasetest.InvalidObjectLocations(obj.Location()) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: test.ObjectLocation.ProjectID,
|
||||
BucketName: test.ObjectLocation.BucketName,
|
||||
ObjectKey: test.ObjectLocation.ObjectKey,
|
||||
},
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
t.Run("Delete object without segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
object, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Run(ctx, t, db, obj, 0)
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{object},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete object with segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
object := metabasetest.CreateObject(ctx, t, db, obj, 2)
|
||||
|
||||
expectedSegmentInfo := metabase.DeletedSegmentInfo{
|
||||
RootPieceID: storj.PieceID{1},
|
||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||
}
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{object},
|
||||
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete object with inline segment", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: obj.Version,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitInlineSegment{
|
||||
Opts: metabase.CommitInlineSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
||||
EncryptedKey: testrand.Bytes(32),
|
||||
EncryptedKeyNonce: testrand.Bytes(32),
|
||||
InlineData: testrand.Bytes(1024),
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
object := metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{object},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete last committed from several versions", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
now := time.Now()
|
||||
newObj := metabasetest.RandObjectStream()
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: newObj.ProjectID,
|
||||
BucketName: newObj.BucketName,
|
||||
ObjectKey: newObj.ObjectKey,
|
||||
Version: newObj.Version,
|
||||
StreamID: newObj.StreamID,
|
||||
},
|
||||
ZombieDeletionDeadline: &now,
|
||||
},
|
||||
Version: newObj.Version,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
newObjDiffVersion := newObj
|
||||
newObjDiffVersion.Version = 4
|
||||
|
||||
committedObject, _ := metabasetest.CreateTestObject{}.Run(ctx, t, db, newObjDiffVersion, 0)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: newObj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
ZombieDeletionDeadline: &now,
|
||||
},
|
||||
{
|
||||
ObjectStream: newObjDiffVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectLastCommitted{
|
||||
Opts: metabase.DeleteObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: newObj.ProjectID,
|
||||
BucketName: newObj.BucketName,
|
||||
ObjectKey: newObj.ObjectKey,
|
||||
}},
|
||||
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{committedObject}},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: newObj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
ZombieDeletionDeadline: &now,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -668,3 +668,25 @@ func (step FinishCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *m
|
||||
require.Zero(t, diff)
|
||||
return result
|
||||
}
|
||||
|
||||
// DeleteObjectLastCommitted is for testing metabase.DeleteObjectLastCommitted.
|
||||
type DeleteObjectLastCommitted struct {
|
||||
Opts metabase.DeleteObjectLastCommitted
|
||||
Result metabase.DeleteObjectResult
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
result, err := db.DeleteObjectLastCommitted(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
sortObjects(result.Objects)
|
||||
sortObjects(step.Result.Objects)
|
||||
sortDeletedSegments(result.Segments)
|
||||
sortDeletedSegments(step.Result.Segments)
|
||||
|
||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user