diff --git a/satellite/metabase/aliascache.go b/satellite/metabase/aliascache.go index 83dba0668..b9e565682 100644 --- a/satellite/metabase/aliascache.go +++ b/satellite/metabase/aliascache.go @@ -138,6 +138,10 @@ func (cache *NodeAliasCache) refresh(ctx context.Context, missingNodes []storj.N func (cache *NodeAliasCache) ConvertPiecesToAliases(ctx context.Context, pieces Pieces) (_ AliasPieces, err error) { defer mon.Task()(&ctx)(&err) + if len(pieces) == 0 { + return AliasPieces{}, nil + } + nodes := make([]storj.NodeID, len(pieces)) for i, p := range pieces { nodes[i] = p.StorageNode diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 8c2982b97..43a52e4c7 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -7,12 +7,15 @@ import ( "bytes" "context" "sort" + "time" + "github.com/jackc/pgtype" "github.com/zeebo/errs" "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/dbutil/pgutil" + "storj.io/private/dbutil/txutil" "storj.io/private/tagsql" ) @@ -45,6 +48,23 @@ type DeletedSegmentInfo struct { Pieces Pieces } +type deletedObjectInfo struct { + Object + Segments []deletedRemoteSegmentInfo + + // while deletion we are trying to find if deleted object have a copy + // and if we need new ancestor to replace it. If we find a copy that + // can be new ancestor we are keeping its stream id in this field. + PromotedAncestor *uuid.UUID +} + +type deletedRemoteSegmentInfo struct { + Position SegmentPosition + RootPieceID storj.PieceID + Pieces Pieces + RepairedAt *time.Time +} + // DeleteObjectAnyStatusAllVersions contains arguments necessary for deleting all object versions. type DeleteObjectAnyStatusAllVersions struct { ObjectLocation @@ -104,7 +124,7 @@ WITH deleted_objects AS ( ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces + RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces ) SELECT deleted_objects.version, deleted_objects.stream_id, @@ -113,12 +133,89 @@ SELECT deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces, - NULL + deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces FROM deleted_objects LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id` +var deleteObjectExactVersionWithCopyFeatureSQL = ` + WITH deleted_objects AS ( + DELETE FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 + RETURNING + version, stream_id, + created_at, expires_at, + status, segment_count, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, + encryption + ), deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING + segments.stream_id, + segments.position, + segments.root_piece_id, + segments.remote_alias_pieces, + segments.repaired_at + ), new_ancestor AS ( + SELECT stream_id, ancestor_stream_id FROM segment_copies + WHERE ancestor_stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + ORDER BY stream_id + LIMIT 1 + ), delete_if_copy AS ( + -- TODO we should try add the condition to delete the new ancestor to reduce number of queries. + DELETE FROM segment_copies + WHERE + stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING false + ) + SELECT + deleted_objects.version, deleted_objects.stream_id, + deleted_objects.created_at, deleted_objects.expires_at, + deleted_objects.status, deleted_objects.segment_count, + deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, + deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, + deleted_objects.encryption, + deleted_segments.position, deleted_segments.root_piece_id, + deleted_segments.remote_alias_pieces, deleted_segments.repaired_at, + new_ancestor.stream_id + FROM deleted_objects + LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id + LEFT JOIN new_ancestor ON deleted_objects.stream_id = new_ancestor.ancestor_stream_id + ORDER BY deleted_objects.stream_id, deleted_segments.position +` + +var deleteFromSegmentCopies = ` + DELETE FROM segment_copies WHERE segment_copies.stream_id = $1 +` + +var updateSegmentsWithAncestor = ` + WITH update_segment_copies AS ( + UPDATE segment_copies + SET ancestor_stream_id = $2 + WHERE ancestor_stream_id = $1 + RETURNING false + ) + UPDATE segments + SET + remote_alias_pieces = P.remote_alias_pieces, + repaired_at = P.repaired_at + FROM (SELECT UNNEST($3::INT8[]), UNNEST($4::BYTEA[]), UNNEST($5::timestamptz[])) + as P(position, remote_alias_pieces, repaired_at) + WHERE + segments.stream_id = $2 AND + segments.position = P.position +` + // DeleteObjectExactVersion deletes an exact object version. +// +// Result will contain only those segments which needs to be deleted +// from storage nodes. If object is an ancestor for copied object its +// segments pieces cannot be deleted because copy still needs it. func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) { defer mon.Task()(&ctx)(&err) @@ -126,14 +223,17 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa return DeleteObjectResult{}, err } - deleteSQL := deleteObjectExactVersionWithoutCopyFeatureSQL - - err = withRows( - db.db.QueryContext(ctx, deleteSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), - )(func(rows tagsql.Rows) error { - result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) - return err - }) + if db.config.ServerSideCopy { + result, err = db.deleteObjectExactVersionServerSideCopy(ctx, opts) + } else { + err = withRows( + db.db.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), + )(func(rows tagsql.Rows) error { + result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) + return err + }) + } if err != nil { return DeleteObjectResult{}, err } @@ -144,6 +244,120 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa return result, nil } +func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) { + objects := []deletedObjectInfo{} + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { + err = withRows( + tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), + )(func(rows tagsql.Rows) error { + objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows) + return err + }) + if err != nil { + return err + } + + return db.promoteNewAncestors(ctx, tx, objects) + }) + if err != nil { + return DeleteObjectResult{}, err + } + + for _, object := range objects { + result.Objects = append(result.Objects, object.Object) + + // if object is ancestor for copied object we cannot delete its + // segments pieces from storage nodes so we are not returning it + // as an object deletion result + if object.PromotedAncestor != nil { + continue + } + for _, segment := range object.Segments { + result.Segments = append(result.Segments, DeletedSegmentInfo{ + RootPieceID: segment.RootPieceID, + Pieces: segment.Pieces, + }) + } + } + return result, nil +} + +func (db *DB) promoteNewAncestors(ctx context.Context, tx tagsql.Tx, objects []deletedObjectInfo) error { + for _, object := range objects { + if object.PromotedAncestor == nil { + continue + } + + positions := make([]int64, len(object.Segments)) + remoteAliasesPieces := make([][]byte, len(object.Segments)) + // special DB type to handle null 'repaired_at' values + repairedAtsArray := make([]pgtype.Timestamptz, len(object.Segments)) + + for i, segment := range object.Segments { + positions[i] = int64(segment.Position.Encode()) + + aliases, err := db.aliasCache.ConvertPiecesToAliases(ctx, segment.Pieces) + if err != nil { + return err + } + + aliasesBytes, err := aliases.Bytes() + if err != nil { + return err + } + remoteAliasesPieces[i] = aliasesBytes + + if segment.RepairedAt == nil { + repairedAtsArray[i] = pgtype.Timestamptz{ + Status: pgtype.Null, + } + } else { + repairedAtsArray[i] = pgtype.Timestamptz{ + Time: *segment.RepairedAt, + Status: pgtype.Present, + } + } + } + + repairedAtArray := &pgtype.TimestamptzArray{ + Elements: repairedAtsArray, + Dimensions: []pgtype.ArrayDimension{{Length: int32(len(repairedAtsArray)), LowerBound: 1}}, + Status: pgtype.Present, + } + + result, err := tx.ExecContext(ctx, deleteFromSegmentCopies, *object.PromotedAncestor) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + + if affected != 1 { + return errs.New("new ancestor was not deleted from segment copies") + } + + result, err = tx.ExecContext(ctx, updateSegmentsWithAncestor, + object.StreamID, *object.PromotedAncestor, pgutil.Int8Array(positions), + pgutil.ByteaArray(remoteAliasesPieces), repairedAtArray) + if err != nil { + return err + } + + affected, err = result.RowsAffected() + if err != nil { + return err + } + + if affected != int64(len(object.Segments)) { + return errs.New("not all new ancestor segments were update: got %d want %d", affected, len(object.Segments)) + } + } + return nil +} + // DeletePendingObject contains arguments necessary for deleting a pending object. type DeletePendingObject struct { ObjectStream @@ -194,8 +408,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject) deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces, - NULL + deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces FROM deleted_objects LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID))(func(rows tagsql.Rows) error { @@ -255,8 +468,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces, - NULL + deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces FROM deleted_objects LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error { @@ -356,6 +568,63 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl return result, nil } +func (db *DB) scanObjectDeletionServerSideCopy(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) { + defer mon.Task()(&ctx)(&err) + defer func() { err = errs.Combine(err, rows.Close()) }() + + result = make([]deletedObjectInfo, 0, 10) + + var rootPieceID *storj.PieceID + // for object without segments we can get position = NULL + var segmentPosition *SegmentPosition + var object deletedObjectInfo + var segment deletedRemoteSegmentInfo + var aliasPieces AliasPieces + + for rows.Next() { + object.ProjectID = location.ProjectID + object.BucketName = location.BucketName + object.ObjectKey = location.ObjectKey + + err = rows.Scan(&object.Version, &object.StreamID, + &object.CreatedAt, &object.ExpiresAt, + &object.Status, &object.SegmentCount, + &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, + &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, + encryptionParameters{&object.Encryption}, + &segmentPosition, &rootPieceID, &aliasPieces, &segment.RepairedAt, + &object.PromotedAncestor, + ) + if err != nil { + return nil, Error.New("unable to delete object: %w", err) + } + if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID { + result = append(result, object) + } + + if rootPieceID != nil { + if segmentPosition != nil { + segment.Position = *segmentPosition + } + + segment.RootPieceID = *rootPieceID + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return nil, Error.Wrap(err) + } + if len(segment.Pieces) > 0 { + result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment) + } + } + } + + if err := rows.Err(); err != nil { + return nil, Error.New("unable to delete object: %w", err) + } + + return result, nil +} + func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) { defer mon.Task()(&ctx)(&err) defer func() { err = errs.Combine(err, rows.Close()) }() @@ -369,7 +638,6 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r var aliasPieces AliasPieces for rows.Next() { - var promotedAncestor *uuid.UUID object.ProjectID = location.ProjectID object.BucketName = location.BucketName object.ObjectKey = location.ObjectKey @@ -380,7 +648,6 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces, - &promotedAncestor, ) if err != nil { return nil, nil, Error.New("unable to delete object: %w", err) @@ -388,9 +655,8 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID { objects = append(objects, object) } - // not nil promotedAncestor means that while delete new ancestor was promoted and - // we should not delete pieces because we had copies and now one copy become ancestor - if rootPieceID != nil && promotedAncestor == nil { + + if rootPieceID != nil { segment.RootPieceID = *rootPieceID segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) if err != nil { diff --git a/satellite/metabase/delete_test.go b/satellite/metabase/delete_test.go index 6ccdce3cb..1ab6cf7fa 100644 --- a/satellite/metabase/delete_test.go +++ b/satellite/metabase/delete_test.go @@ -955,8 +955,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) { } func TestDeleteCopy(t *testing.T) { - t.Skip("skip until deletion query will be fixed for CRDB") - metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { for _, numberOfSegments := range []int{0, 1, 3} { t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) { @@ -1017,7 +1015,6 @@ func TestDeleteCopy(t *testing.T) { t.Run("delete one of two copies", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - numberOfSegments := 0 originalObjectStream := metabasetest.RandObjectStream() originalObj, originalSegments := metabasetest.CreateTestObject{ @@ -1068,10 +1065,9 @@ func TestDeleteCopy(t *testing.T) { t.Run("delete original", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - numberOfSegments := 0 originalObjectStream := metabasetest.RandObjectStream() - originalObj, _ := metabasetest.CreateTestObject{ + originalObj, originalSegments := metabasetest.CreateTestObject{ CommitObject: &metabase.CommitObject{ ObjectStream: originalObjectStream, EncryptedMetadata: testrand.Bytes(64), @@ -1096,6 +1092,10 @@ func TestDeleteCopy(t *testing.T) { }, }.Check(ctx, t, db) + for i := range copySegments { + copySegments[i].Pieces = originalSegments[i].Pieces + } + // verify that the copy is left metabasetest.Verify{ Objects: []metabase.RawObject{ @@ -1107,10 +1107,9 @@ func TestDeleteCopy(t *testing.T) { t.Run("delete original and leave two copies", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - numberOfSegments := 0 originalObjectStream := metabasetest.RandObjectStream() - originalObj, _ := metabasetest.CreateTestObject{ + originalObj, originalSegments := metabasetest.CreateTestObject{ CommitObject: &metabase.CommitObject{ ObjectStream: originalObjectStream, EncryptedMetadata: testrand.Bytes(64), @@ -1143,6 +1142,17 @@ func TestDeleteCopy(t *testing.T) { AncestorStreamID: remainingStreamIDs[0], }} } + expectedAncestorStreamID := remainingStreamIDs[0] + + // set pieces in expected ancestor for verifcation + for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} { + for i := range segments { + if segments[i].StreamID == expectedAncestorStreamID { + segments[i].Pieces = originalSegments[i].Pieces + } + } + } + // verify that two functioning copies are left and the original object is gone metabasetest.Verify{ Objects: []metabase.RawObject{ diff --git a/satellite/metabase/encoding.go b/satellite/metabase/encoding.go index 716c7b72d..80619c408 100644 --- a/satellite/metabase/encoding.go +++ b/satellite/metabase/encoding.go @@ -56,7 +56,7 @@ func (params *SegmentPosition) Scan(value interface{}) error { *params = SegmentPositionFromEncoded(uint64(value)) return nil default: - return Error.New("unable to scan %T into EncryptionParameters", value) + return Error.New("unable to scan %T into SegmentPosition", value) } } diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 67cb3cf61..2ee469c6b 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -353,7 +353,7 @@ func (step DeleteObjectExactVersion) Check(ctx *testcontext.Context, t testing.T sortDeletedSegments(result.Segments) sortDeletedSegments(step.Result.Segments) - diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second), cmpopts.EquateEmpty()) require.Zero(t, diff) }