satellite/metabase: deletion query as explicit transaction

Latest CRDB version did't work with our server-side copy deletion
query and we had to rewrite it.

Fixes https://github.com/storj/storj/issues/4655

Change-Id: I66d5c4abfc3c3f53dea8bc01ae11cd2b020e4289
This commit is contained in:
Michał Niewrzał 2022-03-23 14:16:46 +01:00
parent 7aa001d62f
commit 25e92fe443
5 changed files with 309 additions and 29 deletions

View File

@ -138,6 +138,10 @@ func (cache *NodeAliasCache) refresh(ctx context.Context, missingNodes []storj.N
func (cache *NodeAliasCache) ConvertPiecesToAliases(ctx context.Context, pieces Pieces) (_ AliasPieces, err error) {
defer mon.Task()(&ctx)(&err)
if len(pieces) == 0 {
return AliasPieces{}, nil
}
nodes := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodes[i] = p.StorageNode

View File

@ -7,12 +7,15 @@ import (
"bytes"
"context"
"sort"
"time"
"github.com/jackc/pgtype"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/txutil"
"storj.io/private/tagsql"
)
@ -45,6 +48,23 @@ type DeletedSegmentInfo struct {
Pieces Pieces
}
type deletedObjectInfo struct {
Object
Segments []deletedRemoteSegmentInfo
// while deletion we are trying to find if deleted object have a copy
// and if we need new ancestor to replace it. If we find a copy that
// can be new ancestor we are keeping its stream id in this field.
PromotedAncestor *uuid.UUID
}
type deletedRemoteSegmentInfo struct {
Position SegmentPosition
RootPieceID storj.PieceID
Pieces Pieces
RepairedAt *time.Time
}
// DeleteObjectAnyStatusAllVersions contains arguments necessary for deleting all object versions.
type DeleteObjectAnyStatusAllVersions struct {
ObjectLocation
@ -104,7 +124,7 @@ WITH deleted_objects AS (
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
)
SELECT
deleted_objects.version, deleted_objects.stream_id,
@ -113,12 +133,89 @@ SELECT
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
NULL
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
var deleteObjectExactVersionWithCopyFeatureSQL = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING
segments.stream_id,
segments.position,
segments.root_piece_id,
segments.remote_alias_pieces,
segments.repaired_at
), new_ancestor AS (
SELECT stream_id, ancestor_stream_id FROM segment_copies
WHERE ancestor_stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
ORDER BY stream_id
LIMIT 1
), delete_if_copy AS (
-- TODO we should try add the condition to delete the new ancestor to reduce number of queries.
DELETE FROM segment_copies
WHERE
stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING false
)
SELECT
deleted_objects.version, deleted_objects.stream_id,
deleted_objects.created_at, deleted_objects.expires_at,
deleted_objects.status, deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.position, deleted_segments.root_piece_id,
deleted_segments.remote_alias_pieces, deleted_segments.repaired_at,
new_ancestor.stream_id
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
LEFT JOIN new_ancestor ON deleted_objects.stream_id = new_ancestor.ancestor_stream_id
ORDER BY deleted_objects.stream_id, deleted_segments.position
`
var deleteFromSegmentCopies = `
DELETE FROM segment_copies WHERE segment_copies.stream_id = $1
`
var updateSegmentsWithAncestor = `
WITH update_segment_copies AS (
UPDATE segment_copies
SET ancestor_stream_id = $2
WHERE ancestor_stream_id = $1
RETURNING false
)
UPDATE segments
SET
remote_alias_pieces = P.remote_alias_pieces,
repaired_at = P.repaired_at
FROM (SELECT UNNEST($3::INT8[]), UNNEST($4::BYTEA[]), UNNEST($5::timestamptz[]))
as P(position, remote_alias_pieces, repaired_at)
WHERE
segments.stream_id = $2 AND
segments.position = P.position
`
// DeleteObjectExactVersion deletes an exact object version.
//
// Result will contain only those segments which needs to be deleted
// from storage nodes. If object is an ancestor for copied object its
// segments pieces cannot be deleted because copy still needs it.
func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
@ -126,14 +223,17 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
return DeleteObjectResult{}, err
}
deleteSQL := deleteObjectExactVersionWithoutCopyFeatureSQL
err = withRows(
db.db.QueryContext(ctx, deleteSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
if db.config.ServerSideCopy {
result, err = db.deleteObjectExactVersionServerSideCopy(ctx, opts)
} else {
err = withRows(
db.db.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
}
if err != nil {
return DeleteObjectResult{}, err
}
@ -144,6 +244,120 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
return result, nil
}
func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
objects := []deletedObjectInfo{}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
err = withRows(
tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
return err
})
if err != nil {
return err
}
return db.promoteNewAncestors(ctx, tx, objects)
})
if err != nil {
return DeleteObjectResult{}, err
}
for _, object := range objects {
result.Objects = append(result.Objects, object.Object)
// if object is ancestor for copied object we cannot delete its
// segments pieces from storage nodes so we are not returning it
// as an object deletion result
if object.PromotedAncestor != nil {
continue
}
for _, segment := range object.Segments {
result.Segments = append(result.Segments, DeletedSegmentInfo{
RootPieceID: segment.RootPieceID,
Pieces: segment.Pieces,
})
}
}
return result, nil
}
func (db *DB) promoteNewAncestors(ctx context.Context, tx tagsql.Tx, objects []deletedObjectInfo) error {
for _, object := range objects {
if object.PromotedAncestor == nil {
continue
}
positions := make([]int64, len(object.Segments))
remoteAliasesPieces := make([][]byte, len(object.Segments))
// special DB type to handle null 'repaired_at' values
repairedAtsArray := make([]pgtype.Timestamptz, len(object.Segments))
for i, segment := range object.Segments {
positions[i] = int64(segment.Position.Encode())
aliases, err := db.aliasCache.ConvertPiecesToAliases(ctx, segment.Pieces)
if err != nil {
return err
}
aliasesBytes, err := aliases.Bytes()
if err != nil {
return err
}
remoteAliasesPieces[i] = aliasesBytes
if segment.RepairedAt == nil {
repairedAtsArray[i] = pgtype.Timestamptz{
Status: pgtype.Null,
}
} else {
repairedAtsArray[i] = pgtype.Timestamptz{
Time: *segment.RepairedAt,
Status: pgtype.Present,
}
}
}
repairedAtArray := &pgtype.TimestamptzArray{
Elements: repairedAtsArray,
Dimensions: []pgtype.ArrayDimension{{Length: int32(len(repairedAtsArray)), LowerBound: 1}},
Status: pgtype.Present,
}
result, err := tx.ExecContext(ctx, deleteFromSegmentCopies, *object.PromotedAncestor)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected != 1 {
return errs.New("new ancestor was not deleted from segment copies")
}
result, err = tx.ExecContext(ctx, updateSegmentsWithAncestor,
object.StreamID, *object.PromotedAncestor, pgutil.Int8Array(positions),
pgutil.ByteaArray(remoteAliasesPieces), repairedAtArray)
if err != nil {
return err
}
affected, err = result.RowsAffected()
if err != nil {
return err
}
if affected != int64(len(object.Segments)) {
return errs.New("not all new ancestor segments were update: got %d want %d", affected, len(object.Segments))
}
}
return nil
}
// DeletePendingObject contains arguments necessary for deleting a pending object.
type DeletePendingObject struct {
ObjectStream
@ -194,8 +408,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
NULL
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID))(func(rows tagsql.Rows) error {
@ -255,8 +468,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces,
NULL
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
@ -356,6 +568,63 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
return result, nil
}
func (db *DB) scanObjectDeletionServerSideCopy(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer func() { err = errs.Combine(err, rows.Close()) }()
result = make([]deletedObjectInfo, 0, 10)
var rootPieceID *storj.PieceID
// for object without segments we can get position = NULL
var segmentPosition *SegmentPosition
var object deletedObjectInfo
var segment deletedRemoteSegmentInfo
var aliasPieces AliasPieces
for rows.Next() {
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
object.ObjectKey = location.ObjectKey
err = rows.Scan(&object.Version, &object.StreamID,
&object.CreatedAt, &object.ExpiresAt,
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
&segmentPosition, &rootPieceID, &aliasPieces, &segment.RepairedAt,
&object.PromotedAncestor,
)
if err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID {
result = append(result, object)
}
if rootPieceID != nil {
if segmentPosition != nil {
segment.Position = *segmentPosition
}
segment.RootPieceID = *rootPieceID
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, Error.Wrap(err)
}
if len(segment.Pieces) > 0 {
result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment)
}
}
}
if err := rows.Err(); err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
return result, nil
}
func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer func() { err = errs.Combine(err, rows.Close()) }()
@ -369,7 +638,6 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
var aliasPieces AliasPieces
for rows.Next() {
var promotedAncestor *uuid.UUID
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
object.ObjectKey = location.ObjectKey
@ -380,7 +648,6 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces,
&promotedAncestor,
)
if err != nil {
return nil, nil, Error.New("unable to delete object: %w", err)
@ -388,9 +655,8 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID {
objects = append(objects, object)
}
// not nil promotedAncestor means that while delete new ancestor was promoted and
// we should not delete pieces because we had copies and now one copy become ancestor
if rootPieceID != nil && promotedAncestor == nil {
if rootPieceID != nil {
segment.RootPieceID = *rootPieceID
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {

View File

@ -955,8 +955,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
}
func TestDeleteCopy(t *testing.T) {
t.Skip("skip until deletion query will be fixed for CRDB")
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
for _, numberOfSegments := range []int{0, 1, 3} {
t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) {
@ -1017,7 +1015,6 @@ func TestDeleteCopy(t *testing.T) {
t.Run("delete one of two copies", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfSegments := 0
originalObjectStream := metabasetest.RandObjectStream()
originalObj, originalSegments := metabasetest.CreateTestObject{
@ -1068,10 +1065,9 @@ func TestDeleteCopy(t *testing.T) {
t.Run("delete original", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfSegments := 0
originalObjectStream := metabasetest.RandObjectStream()
originalObj, _ := metabasetest.CreateTestObject{
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjectStream,
EncryptedMetadata: testrand.Bytes(64),
@ -1096,6 +1092,10 @@ func TestDeleteCopy(t *testing.T) {
},
}.Check(ctx, t, db)
for i := range copySegments {
copySegments[i].Pieces = originalSegments[i].Pieces
}
// verify that the copy is left
metabasetest.Verify{
Objects: []metabase.RawObject{
@ -1107,10 +1107,9 @@ func TestDeleteCopy(t *testing.T) {
t.Run("delete original and leave two copies", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfSegments := 0
originalObjectStream := metabasetest.RandObjectStream()
originalObj, _ := metabasetest.CreateTestObject{
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjectStream,
EncryptedMetadata: testrand.Bytes(64),
@ -1143,6 +1142,17 @@ func TestDeleteCopy(t *testing.T) {
AncestorStreamID: remainingStreamIDs[0],
}}
}
expectedAncestorStreamID := remainingStreamIDs[0]
// set pieces in expected ancestor for verifcation
for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} {
for i := range segments {
if segments[i].StreamID == expectedAncestorStreamID {
segments[i].Pieces = originalSegments[i].Pieces
}
}
}
// verify that two functioning copies are left and the original object is gone
metabasetest.Verify{
Objects: []metabase.RawObject{

View File

@ -56,7 +56,7 @@ func (params *SegmentPosition) Scan(value interface{}) error {
*params = SegmentPositionFromEncoded(uint64(value))
return nil
default:
return Error.New("unable to scan %T into EncryptionParameters", value)
return Error.New("unable to scan %T into SegmentPosition", value)
}
}

View File

@ -353,7 +353,7 @@ func (step DeleteObjectExactVersion) Check(ctx *testcontext.Context, t testing.T
sortDeletedSegments(result.Segments)
sortDeletedSegments(step.Result.Segments)
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second), cmpopts.EquateEmpty())
require.Zero(t, diff)
}