diff --git a/satellite/audit/audit_test.go b/satellite/audit/audit_test.go index e20d61e51..4129adbcb 100644 --- a/satellite/audit/audit_test.go +++ b/satellite/audit/audit_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "storj.io/common/memory" "storj.io/common/pb" @@ -17,6 +18,7 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/audit" ) @@ -77,10 +79,16 @@ func TestAuditOrderLimit(t *testing.T) { }) } -// Minimal test to verify that copies aren't audited. +// Minimal test to verify that copies are also audited as we duplicate +// all segment metadata. func TestAuditSkipsRemoteCopies(t *testing.T) { testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.ServerSideCopyDuplicateMetadata = true + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] audits := satellite.Audit @@ -96,10 +104,6 @@ func TestAuditSkipsRemoteCopies(t *testing.T) { err = uplink.Upload(ctx, satellite, "testbucket", "testobj2", testData) require.NoError(t, err) - originalSegments, err := satellite.Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, originalSegments, 2) - project, err := uplink.OpenProject(ctx, satellite) require.NoError(t, err) defer ctx.Check(project.Close) @@ -107,13 +111,17 @@ func TestAuditSkipsRemoteCopies(t *testing.T) { _, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil) require.NoError(t, err) + allSegments, err := satellite.Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, allSegments, 3) + err = runQueueingOnce(ctx, satellite) require.NoError(t, err) queue := audits.VerifyQueue - auditSegments := make([]audit.Segment, 0, 2) - for range originalSegments { + auditSegments := make([]audit.Segment, 0, 3) + for range allSegments { auditSegment, err := queue.Next(ctx) require.NoError(t, err) auditSegments = append(auditSegments, auditSegment) @@ -125,8 +133,8 @@ func TestAuditSkipsRemoteCopies(t *testing.T) { }) // Check that StreamID of copy - for i := range originalSegments { - require.Equal(t, originalSegments[i].StreamID, auditSegments[i].StreamID) + for i := range allSegments { + require.Equal(t, allSegments[i].StreamID, auditSegments[i].StreamID) } // delete originals, keep 1 copy @@ -138,14 +146,18 @@ func TestAuditSkipsRemoteCopies(t *testing.T) { err = runQueueingOnce(ctx, satellite) require.NoError(t, err) + allSegments, err = satellite.Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, allSegments, 1) + queue = audits.VerifyQueue // verify that the copy is being audited remainingSegment, err := queue.Next(ctx) require.NoError(t, err) - for _, originalSegment := range originalSegments { - require.NotEqual(t, originalSegment.StreamID, remainingSegment.StreamID) + for _, segment := range allSegments { + require.Equal(t, segment.StreamID, remainingSegment.StreamID) } }) } diff --git a/satellite/gracefulexit/gracefulexit_test.go b/satellite/gracefulexit/gracefulexit_test.go index 8fdd10022..0da84b367 100644 --- a/satellite/gracefulexit/gracefulexit_test.go +++ b/satellite/gracefulexit/gracefulexit_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "golang.org/x/exp/slices" "storj.io/common/memory" "storj.io/common/pb" @@ -233,14 +235,19 @@ func TestGracefulExit_CopiedObjects(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(2, 3, 4, 4), + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 3, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.ServerSideCopyDuplicateMetadata = true + }, + ), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) require.NoError(t, err) defer ctx.Check(project.Close) - testGETransferQueue := func(node storj.NodeID) { + testGETransferQueue := func(node storj.NodeID, segmentsToTransfer int) { _, err = planet.Satellites[0].Overlay.DB.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: node, ExitInitiatedAt: time.Now().UTC(), @@ -251,26 +258,21 @@ func TestGracefulExit_CopiedObjects(t *testing.T) { _, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx) require.NoError(t, err) - // we should get only one item from GE queue as we have only one remote segments + // we should get two items from GE queue as we have remote segment and its copy items, err := planet.Satellites[0].DB.GracefulExit().GetIncomplete(ctx, node, 100, 0) require.NoError(t, err) - require.Len(t, items, 1) + require.Len(t, items, segmentsToTransfer) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) - // find non copy remote segment - segment := metabase.Segment{} - for _, s := range segments { - if len(s.Pieces) > 0 { - if !segment.StreamID.IsZero() { - t.Fatal("we should have only one remote non copy segment") - } - segment = s + for _, segment := range segments { + if !segment.Inline() { + require.True(t, slices.ContainsFunc(items, func(item *gracefulexit.TransferQueueItem) bool { + return item.StreamID == segment.StreamID + })) } } - require.True(t, !segment.StreamID.IsZero()) - require.Equal(t, segment.StreamID, items[0].StreamID) } // upload inline and remote and make copies @@ -282,15 +284,15 @@ func TestGracefulExit_CopiedObjects(t *testing.T) { require.NoError(t, err) } - testGETransferQueue(planet.StorageNodes[0].ID()) + testGETransferQueue(planet.StorageNodes[0].ID(), 2) - // delete original objects to promote copies as new ancestors + // delete original objects for _, size := range []memory.Size{memory.KiB, 10 * memory.KiB} { _, err = project.DeleteObject(ctx, "my-bucket", "my-object-"+size.String()) require.NoError(t, err) } - testGETransferQueue(planet.StorageNodes[1].ID()) + testGETransferQueue(planet.StorageNodes[1].ID(), 1) }) } diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 14ed3cc5c..7d632e9c4 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -613,10 +613,6 @@ type CommitObject struct { EncryptedMetadataEncryptedKey []byte // optional DisallowDelete bool - // OnDelete will be triggered when/if existing object will be overwritten on commit. - // Wil be only executed after succesfull commit + delete DB operation. - // Error on this function won't revert back committed object. - OnDelete func(segments []DeletedSegmentInfo) UsePendingObjectsTable bool } @@ -650,8 +646,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } - deletedSegments := []DeletedSegmentInfo{} - err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) if err != nil { @@ -877,7 +871,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec } for _, version := range versionsToDelete { - deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ + _, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ ObjectLocation: ObjectLocation{ ProjectID: opts.ProjectID, BucketName: opts.BucketName, @@ -888,8 +882,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec if err != nil { return Error.New("failed to delete existing object: %w", err) } - - deletedSegments = append(deletedSegments, deleteResult.Segments...) } object.StreamID = opts.StreamID @@ -908,11 +900,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } - // we can execute this only when whole transaction is committed without any error - if len(deletedSegments) > 0 && opts.OnDelete != nil { - opts.OnDelete(deletedSegments) - } - mon.Meter("object_commit").Mark(1) mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount)) mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize) diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index eb7a13ce5..aafb09d71 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" @@ -3762,46 +3760,6 @@ func TestCommitObject(t *testing.T) { }.Check(ctx, t, db) }) - t.Run("OnDelete", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - - // check deleted segments - obj := metabasetest.RandObjectStream() - - _, expectedSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 3) - - expectedDeletedSegments := []metabase.DeletedSegmentInfo{} - for _, segment := range expectedSegments { - expectedDeletedSegments = append(expectedDeletedSegments, metabase.DeletedSegmentInfo{ - RootPieceID: segment.RootPieceID, - Pieces: segment.Pieces, - }) - } - - obj.Version++ - - metabasetest.BeginObjectExactVersion{ - Opts: metabase.BeginObjectExactVersion{ - ObjectStream: obj, - Encryption: metabasetest.DefaultEncryption, - }, - Version: obj.Version, - }.Check(ctx, t, db) - - deletedSegments := []metabase.DeletedSegmentInfo{} - metabasetest.CommitObject{ - Opts: metabase.CommitObject{ - ObjectStream: obj, - Encryption: metabasetest.DefaultEncryption, - OnDelete: func(segments []metabase.DeletedSegmentInfo) { - deletedSegments = append(deletedSegments, segments...) - }, - }, - }.Check(ctx, t, db) - - require.Equal(t, expectedDeletedSegments, deletedSegments) - }) - t.Run("use pending objects table", func(t *testing.T) { obj.Version = metabase.NextVersion t.Run("version", func(t *testing.T) { diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index 2cad8df89..fa5b300bc 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -263,7 +263,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje if objectAtDestination != nil { version := objectAtDestination.Version - deletedObjects, err := db.deleteObjectExactVersionServerSideCopy( + deletedObjects, err := db.deleteObjectExactVersion( ctx, DeleteObjectExactVersion{ Version: version, ObjectLocation: ObjectLocation{ @@ -278,13 +278,10 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje } // The object at the destination was the ancestor! - // Now that the ancestor of the source object is removed, we need to change the target ancestor. if ancestorStreamID == objectAtDestination.StreamID { - if len(deletedObjects) == 0 { + if len(deletedObjects.Objects) == 0 { return Error.New("ancestor is gone, please retry operation") } - - ancestorStreamID = *deletedObjects[0].PromotedAncestor } } diff --git a/satellite/metabase/copy_object_test.go b/satellite/metabase/copy_object_test.go index 8d06f9b58..b03eacf4d 100644 --- a/satellite/metabase/copy_object_test.go +++ b/satellite/metabase/copy_object_test.go @@ -86,6 +86,8 @@ func TestBeginCopyObject(t *testing.T) { } func TestFinishCopyObject(t *testing.T) { + t.Skip("test will be removed in subsequent change") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() newBucketName := "New bucket name" diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 1134a8ce5..504f8f07f 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -6,19 +6,21 @@ package metabase import ( "bytes" "context" - "fmt" "sort" - "time" "github.com/zeebo/errs" "storj.io/common/storj" - "storj.io/common/uuid" "storj.io/private/dbutil/pgutil" - "storj.io/private/dbutil/txutil" "storj.io/private/tagsql" ) +// DeletedSegmentInfo info about deleted segment. +type DeletedSegmentInfo struct { + RootPieceID storj.PieceID + Pieces Pieces +} + // DeleteObjectExactVersion contains arguments necessary for deleting an exact version of object. type DeleteObjectExactVersion struct { Version Version @@ -38,31 +40,7 @@ func (obj *DeleteObjectExactVersion) Verify() error { // DeleteObjectResult result of deleting object. type DeleteObjectResult struct { - Objects []Object - Segments []DeletedSegmentInfo -} - -// DeletedSegmentInfo info about deleted segment. -type DeletedSegmentInfo struct { - RootPieceID storj.PieceID - Pieces Pieces -} - -type deletedObjectInfo struct { - Object - Segments []deletedRemoteSegmentInfo - - // while deletion we are trying to find if deleted object have a copy - // and if we need new ancestor to replace it. If we find a copy that - // can be new ancestor we are keeping its stream id in this field. - PromotedAncestor *uuid.UUID -} - -type deletedRemoteSegmentInfo struct { - Position SegmentPosition - RootPieceID storj.PieceID - Pieces Pieces - RepairedAt *time.Time + Objects []Object } // DeleteObjectsAllVersions contains arguments necessary for deleting all versions of multiple objects from the same bucket. @@ -101,7 +79,7 @@ func (delete *DeleteObjectsAllVersions) Verify() error { return nil } -var deleteObjectExactVersionWithoutCopyFeatureSQL = ` +var deleteObjectExactVersion = ` WITH deleted_objects AS ( DELETE FROM objects WHERE @@ -110,29 +88,21 @@ WITH deleted_objects AS ( object_key = $3 AND version = $4 RETURNING - version, stream_id, - created_at, expires_at, - status, segment_count, - encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces + RETURNING segments.stream_id ) SELECT - deleted_objects.version, deleted_objects.stream_id, - deleted_objects.created_at, deleted_objects.expires_at, - deleted_objects.status, deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces -FROM deleted_objects -LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id` + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption +FROM deleted_objects` -var deleteObjectLastCommittedWithoutCopyFeatureSQL = ` +var deleteObjectLastCommitted = ` WITH deleted_objects AS ( DELETE FROM objects WHERE @@ -157,338 +127,56 @@ WITH deleted_objects AS ( ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces + RETURNING segments.stream_id ) SELECT - deleted_objects.version, deleted_objects.stream_id, - deleted_objects.created_at, deleted_objects.expires_at, - deleted_objects.status, deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces -FROM deleted_objects -LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id` - -// TODO: remove comments with regex. -var deleteBucketObjectsWithCopyFeatureSQL = ` -WITH deleted_objects AS ( - %s - RETURNING - stream_id - -- extra properties only returned when deleting single object - %s -), -deleted_segments AS ( - DELETE FROM segments - WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING - segments.stream_id, - segments.position, - segments.inline_data, - segments.plain_size, - segments.encrypted_size, - segments.repaired_at, - segments.root_piece_id, - segments.remote_alias_pieces -), -deleted_copies AS ( - DELETE FROM segment_copies - WHERE segment_copies.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segment_copies.stream_id -), --- lowest stream_id becomes new ancestor -promoted_ancestors AS ( - -- select only one child to promote per ancestor - SELECT DISTINCT ON (segment_copies.ancestor_stream_id) - segment_copies.stream_id AS new_ancestor_stream_id, - segment_copies.ancestor_stream_id AS deleted_stream_id - FROM segment_copies - -- select children about to lose their ancestor - -- this is not a WHERE clause because that caused a full table scan in CockroachDB - INNER JOIN deleted_objects - ON deleted_objects.stream_id = segment_copies.ancestor_stream_id - -- don't select children which will be removed themselves - WHERE segment_copies.stream_id NOT IN ( - SELECT stream_id - FROM deleted_objects - ) -) -SELECT - deleted_objects.stream_id, - deleted_segments.position, - deleted_segments.root_piece_id, - -- piece to remove from storagenodes or link to new ancestor - deleted_segments.remote_alias_pieces, - -- if set, caller needs to promote this stream_id to new ancestor or else object contents will be lost - promoted_ancestors.new_ancestor_stream_id - -- extra properties only returned when deleting single object - %s -FROM deleted_objects -LEFT JOIN deleted_segments - ON deleted_objects.stream_id = deleted_segments.stream_id -LEFT JOIN promoted_ancestors - ON deleted_objects.stream_id = promoted_ancestors.deleted_stream_id -ORDER BY stream_id -` - -var deleteObjectExactVersionSubSQL = ` -DELETE FROM objects -WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - version = $4 -` - -var deleteObjectLastCommittedSubSQL = ` -DELETE FROM objects -WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - version IN (SELECT version FROM objects WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - status = ` + committedStatus + ` AND - (expires_at IS NULL OR expires_at > now()) - ORDER BY version DESC - ) -` - -var deleteObjectExactVersionWithCopyFeatureSQL = fmt.Sprintf( - deleteBucketObjectsWithCopyFeatureSQL, - deleteObjectExactVersionSubSQL, - `,version, - created_at, - expires_at, - status, - segment_count, - encrypted_metadata_nonce, - encrypted_metadata, - encrypted_metadata_encrypted_key, - total_plain_size, - total_encrypted_size, - fixed_segment_size, - encryption`, - `,deleted_objects.version, - deleted_objects.created_at, - deleted_objects.expires_at, - deleted_objects.status, - deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, - deleted_objects.encrypted_metadata, - deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, - deleted_objects.total_encrypted_size, - deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.repaired_at`, -) - -var deleteObjectLastCommittedWithCopyFeatureSQL = fmt.Sprintf( - deleteBucketObjectsWithCopyFeatureSQL, - deleteObjectLastCommittedSubSQL, - `,version, - created_at, - expires_at, - status, - segment_count, - encrypted_metadata_nonce, - encrypted_metadata, - encrypted_metadata_encrypted_key, - total_plain_size, - total_encrypted_size, - fixed_segment_size, - encryption`, - `,deleted_objects.version, - deleted_objects.created_at, - deleted_objects.expires_at, - deleted_objects.status, - deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, - deleted_objects.encrypted_metadata, - deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, - deleted_objects.total_encrypted_size, - deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.repaired_at`, -) - -var deleteFromSegmentCopies = ` - DELETE FROM segment_copies WHERE segment_copies.stream_id = $1 -` - -var updateSegmentsWithAncestor = ` - WITH update_segment_copies AS ( - UPDATE segment_copies - SET ancestor_stream_id = $2 - WHERE ancestor_stream_id = $1 - RETURNING false - ) - UPDATE segments - SET - remote_alias_pieces = P.remote_alias_pieces, - repaired_at = P.repaired_at - FROM (SELECT UNNEST($3::INT8[]), UNNEST($4::BYTEA[]), UNNEST($5::timestamptz[])) - as P(position, remote_alias_pieces, repaired_at) - WHERE - segments.stream_id = $2 AND - segments.position = P.position -` + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption +FROM deleted_objects` // DeleteObjectExactVersion deletes an exact object version. -// -// Result will contain only those segments which needs to be deleted -// from storage nodes. If object is an ancestor for copied object its -// segments pieces cannot be deleted because copy still needs it. func (db *DB) DeleteObjectExactVersion( ctx context.Context, opts DeleteObjectExactVersion, ) (result DeleteObjectResult, err error) { - err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { - result, err = db.deleteObjectExactVersion(ctx, opts, tx) - if err != nil { - return err - } - return nil - }) + result, err = db.deleteObjectExactVersion(ctx, opts, db.db) + if err != nil { + return DeleteObjectResult{}, err + } + return result, nil +} - return result, err +type stmt interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (tagsql.Rows, error) } // implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package. -func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (result DeleteObjectResult, err error) { +func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, stmt stmt) (result DeleteObjectResult, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return DeleteObjectResult{}, err } - if db.config.ServerSideCopy { - objects, err := db.deleteObjectExactVersionServerSideCopy(ctx, opts, tx) - if err != nil { - return DeleteObjectResult{}, err - } - - for _, object := range objects { - result.Objects = append(result.Objects, object.Object) - - // if object is ancestor for copied object we cannot delete its - // segments pieces from storage nodes so we are not returning it - // as an object deletion result - if object.PromotedAncestor != nil { - continue - } - for _, segment := range object.Segments { - result.Segments = append(result.Segments, DeletedSegmentInfo{ - RootPieceID: segment.RootPieceID, - Pieces: segment.Pieces, - }) - } - } - } else { - err = withRows( - tx.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), - )(func(rows tagsql.Rows) error { - result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) - return err - }) - } + err = withRows( + stmt.QueryContext(ctx, deleteObjectExactVersion, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), + )(func(rows tagsql.Rows) error { + result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) + return err + }) if err != nil { return DeleteObjectResult{}, err } mon.Meter("object_delete").Mark(len(result.Objects)) - mon.Meter("segment_delete").Mark(len(result.Segments)) + for _, object := range result.Objects { + mon.Meter("segment_delete").Mark(int(object.SegmentCount)) + } return result, nil } -func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (objects []deletedObjectInfo, err error) { - defer mon.Task()(&ctx)(&err) - - err = withRows( - tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), - )(func(rows tagsql.Rows) error { - objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows) - return err - }) - if err != nil { - return nil, err - } - - err = db.promoteNewAncestors(ctx, tx, objects) - if err != nil { - return nil, err - } - - return objects, nil -} - -func (db *DB) promoteNewAncestors(ctx context.Context, tx tagsql.Tx, objects []deletedObjectInfo) (err error) { - defer mon.Task()(&ctx)(&err) - - for _, object := range objects { - if object.PromotedAncestor == nil { - continue - } - - positions := make([]int64, len(object.Segments)) - remoteAliasesPieces := make([][]byte, len(object.Segments)) - repairedAts := make([]*time.Time, len(object.Segments)) - - for i, segment := range object.Segments { - positions[i] = int64(segment.Position.Encode()) - - aliases, err := db.aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces) - if err != nil { - return err - } - - aliasesBytes, err := aliases.Bytes() - if err != nil { - return err - } - remoteAliasesPieces[i] = aliasesBytes - repairedAts[i] = segment.RepairedAt - } - - result, err := tx.ExecContext(ctx, deleteFromSegmentCopies, *object.PromotedAncestor) - if err != nil { - return err - } - - affected, err := result.RowsAffected() - if err != nil { - return err - } - - if affected != 1 { - return errs.New("new ancestor was not deleted from segment copies") - } - - result, err = tx.ExecContext(ctx, updateSegmentsWithAncestor, - object.StreamID, *object.PromotedAncestor, pgutil.Int8Array(positions), - pgutil.ByteaArray(remoteAliasesPieces), pgutil.NullTimestampTZArray(repairedAts)) - if err != nil { - return err - } - - affected, err = result.RowsAffected() - if err != nil { - return err - } - - if affected != int64(len(object.Segments)) { - return errs.New("not all new ancestor segments were update: got %d want %d", affected, len(object.Segments)) - } - } - return nil -} - // DeletePendingObject contains arguments necessary for deleting a pending object. type DeletePendingObject struct { ObjectStream @@ -521,29 +209,21 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject) stream_id = $5 AND status = `+pendingStatus+` RETURNING - version, stream_id, - created_at, expires_at, - status, segment_count, + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption + total_plain_size, total_encrypted_size, fixed_segment_size, encryption ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces + RETURNING segments.stream_id ) SELECT - deleted_objects.version, deleted_objects.stream_id, - deleted_objects.created_at, deleted_objects.expires_at, - deleted_objects.status, deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces + version, stream_id, created_at, expires_at, status, segment_count, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, encryption FROM deleted_objects - LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID))(func(rows tagsql.Rows) error { - result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.Location(), rows) + result.Objects, err = db.scanObjectDeletion(ctx, opts.Location(), rows) return err }) @@ -556,7 +236,9 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject) } mon.Meter("object_delete").Mark(len(result.Objects)) - mon.Meter("segment_delete").Mark(len(result.Segments)) + for _, object := range result.Objects { + mon.Meter("segment_delete").Mark(int(object.SegmentCount)) + } return result, nil } @@ -601,31 +283,23 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl object_key = ANY ($3) AND status = `+committedStatus+` RETURNING - project_id, bucket_name, - object_key, version, stream_id, - created_at, expires_at, - status, segment_count, - encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption + project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, + status, segment_count, encrypted_metadata_nonce, encrypted_metadata, + encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces + RETURNING segments.stream_id ) SELECT - deleted_objects.project_id, deleted_objects.bucket_name, - deleted_objects.object_key,deleted_objects.version, deleted_objects.stream_id, - deleted_objects.created_at, deleted_objects.expires_at, - deleted_objects.status, deleted_objects.segment_count, - deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key, - deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size, - deleted_objects.encryption, - deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces + project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, + status, segment_count, encrypted_metadata_nonce, encrypted_metadata, + encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption FROM deleted_objects - LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id `, projectID, []byte(bucketName), pgutil.ByteaArray(objectKeys)))(func(rows tagsql.Rows) error { - result.Objects, result.Segments, err = db.scanMultipleObjectsDeletion(ctx, rows) + result.Objects, err = db.scanMultipleObjectsDeletion(ctx, rows) return err }) @@ -634,87 +308,20 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl } mon.Meter("object_delete").Mark(len(result.Objects)) - mon.Meter("segment_delete").Mark(len(result.Segments)) - - return result, nil -} - -func (db *DB) scanObjectDeletionServerSideCopy(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) { - defer mon.Task()(&ctx)(&err) - defer func() { err = errs.Combine(err, rows.Close()) }() - - result = make([]deletedObjectInfo, 0, 10) - - var rootPieceID *storj.PieceID - // for object without segments we can get position = NULL - var segmentPosition *SegmentPosition - var object deletedObjectInfo - var segment deletedRemoteSegmentInfo - var aliasPieces AliasPieces - - for rows.Next() { - object.ProjectID = location.ProjectID - object.BucketName = location.BucketName - object.ObjectKey = location.ObjectKey - - err = rows.Scan( - // shared properties between deleteObject and deleteBucketObjects functionality - &object.StreamID, - &segmentPosition, - &rootPieceID, - &aliasPieces, - &object.PromotedAncestor, - // properties only for deleteObject functionality - &object.Version, - &object.CreatedAt, &object.ExpiresAt, - &object.Status, &object.SegmentCount, - &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, - &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, - encryptionParameters{&object.Encryption}, - &segment.RepairedAt, - ) - if err != nil { - return nil, Error.New("unable to delete object: %w", err) - } - if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID { - result = append(result, object) - } - - if rootPieceID != nil { - if segmentPosition != nil { - segment.Position = *segmentPosition - } - - segment.RootPieceID = *rootPieceID - segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) - if err != nil { - return nil, Error.Wrap(err) - } - if len(segment.Pieces) > 0 { - result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment) - } - } - } - - if err := rows.Err(); err != nil { - return nil, Error.New("unable to delete object: %w", err) + for _, object := range result.Objects { + mon.Meter("segment_delete").Mark(int(object.SegmentCount)) } return result, nil } -func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) { +func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) { defer mon.Task()(&ctx)(&err) defer func() { err = errs.Combine(err, rows.Close()) }() objects = make([]Object, 0, 10) - segments = make([]DeletedSegmentInfo, 0, 10) - var rootPieceID *storj.PieceID var object Object - var segment DeletedSegmentInfo - var aliasPieces AliasPieces - for rows.Next() { object.ProjectID = location.ProjectID object.BucketName = location.BucketName @@ -725,49 +332,29 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r &object.Status, &object.SegmentCount, &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, - encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces, + encryptionParameters{&object.Encryption}, ) if err != nil { - return nil, nil, Error.New("unable to delete object: %w", err) - } - if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID { - objects = append(objects, object) + return nil, Error.New("unable to delete object: %w", err) } - if rootPieceID != nil { - segment.RootPieceID = *rootPieceID - segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) - if err != nil { - return nil, nil, Error.Wrap(err) - } - if len(segment.Pieces) > 0 { - segments = append(segments, segment) - } - } + objects = append(objects, object) } if err := rows.Err(); err != nil { - return nil, nil, Error.New("unable to delete object: %w", err) + return nil, Error.New("unable to delete object: %w", err) } - if len(segments) == 0 { - return objects, nil, nil - } - return objects, segments, nil + return objects, nil } -func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) { +func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, err error) { defer mon.Task()(&ctx)(&err) defer func() { err = errs.Combine(err, rows.Close()) }() objects = make([]Object, 0, 10) - segments = make([]DeletedSegmentInfo, 0, 10) - var rootPieceID *storj.PieceID var object Object - var segment DeletedSegmentInfo - var aliasPieces AliasPieces - for rows.Next() { err = rows.Scan(&object.ProjectID, &object.BucketName, &object.ObjectKey, &object.Version, &object.StreamID, @@ -775,38 +362,23 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) &object.Status, &object.SegmentCount, &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, - encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces) + encryptionParameters{&object.Encryption}) if err != nil { - return nil, nil, Error.New("unable to delete object: %w", err) + return nil, Error.New("unable to delete object: %w", err) } - if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID { - objects = append(objects, object) - } - if rootPieceID != nil { - segment.RootPieceID = *rootPieceID - segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) - if err != nil { - return nil, nil, Error.Wrap(err) - } - if len(segment.Pieces) > 0 { - segments = append(segments, segment) - } - } + objects = append(objects, object) } if err := rows.Err(); err != nil { - return nil, nil, Error.New("unable to delete object: %w", err) + return nil, Error.New("unable to delete object: %w", err) } if len(objects) == 0 { objects = nil } - if len(segments) == 0 { - return objects, nil, nil - } - return objects, segments, nil + return objects, nil } // DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object. @@ -820,89 +392,30 @@ func (obj *DeleteObjectLastCommitted) Verify() error { } // DeleteObjectLastCommitted deletes an object last committed version. -// -// Result will contain only those segments which needs to be deleted -// from storage nodes. If object is an ancestor for copied object its -// segments pieces cannot be deleted because copy still needs it. func (db *DB) DeleteObjectLastCommitted( ctx context.Context, opts DeleteObjectLastCommitted, ) (result DeleteObjectResult, err error) { - err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { - result, err = db.deleteObjectLastCommitted(ctx, opts, tx) - if err != nil { - return err - } - return nil - }) - return result, err -} - -// implementation of DB.DeleteObjectLastCommitted for re-use internally in metabase package. -func (db *DB) deleteObjectLastCommitted(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (result DeleteObjectResult, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return DeleteObjectResult{}, err } - if db.config.ServerSideCopy { - objects, err := db.deleteObjectLastCommittedServerSideCopy(ctx, opts, tx) - if err != nil { - return DeleteObjectResult{}, err - } - - for _, object := range objects { - result.Objects = append(result.Objects, object.Object) - - // if object is ancestor for copied object we cannot delete its - // segments pieces from storage nodes so we are not returning it - // as an object deletion result - if object.PromotedAncestor != nil { - continue - } - for _, segment := range object.Segments { - result.Segments = append(result.Segments, DeletedSegmentInfo{ - RootPieceID: segment.RootPieceID, - Pieces: segment.Pieces, - }) - } - } - } else { - err = withRows( - tx.QueryContext(ctx, deleteObjectLastCommittedWithoutCopyFeatureSQL, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey), - )(func(rows tagsql.Rows) error { - result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) - return err - }) - } + err = withRows( + db.db.QueryContext(ctx, deleteObjectLastCommitted, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey), + )(func(rows tagsql.Rows) error { + result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) + return err + }) if err != nil { return DeleteObjectResult{}, err } mon.Meter("object_delete").Mark(len(result.Objects)) - mon.Meter("segment_delete").Mark(len(result.Segments)) + for _, object := range result.Objects { + mon.Meter("segment_delete").Mark(int(object.SegmentCount)) + } return result, nil } - -func (db *DB) deleteObjectLastCommittedServerSideCopy(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (objects []deletedObjectInfo, err error) { - defer mon.Task()(&ctx)(&err) - - err = withRows( - tx.QueryContext(ctx, deleteObjectLastCommittedWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey), - )(func(rows tagsql.Rows) error { - objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows) - return err - }) - if err != nil { - return nil, err - } - - err = db.promoteNewAncestors(ctx, tx, objects) - if err != nil { - return nil, err - } - - return objects, nil -} diff --git a/satellite/metabase/delete_test.go b/satellite/metabase/delete_test.go index 595e043fe..643028614 100644 --- a/satellite/metabase/delete_test.go +++ b/satellite/metabase/delete_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" - "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" @@ -198,11 +197,6 @@ func TestDeletePendingObject(t *testing.T) { metabasetest.CreatePendingObject(ctx, t, db, obj, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeletePendingObject{ Opts: metabase.DeletePendingObject{ ObjectStream: obj, @@ -216,7 +210,6 @@ func TestDeletePendingObject(t *testing.T) { Encryption: metabasetest.DefaultEncryption, }, }, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, }, }.Check(ctx, t, db) @@ -400,19 +393,13 @@ func TestDeleteObjectExactVersion(t *testing.T) { object := metabasetest.CreateObject(ctx, t, db, obj, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeleteObjectExactVersion{ Opts: metabase.DeleteObjectExactVersion{ ObjectLocation: location, Version: 1, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{object}, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + Objects: []metabase.Object{object}, }, }.Check(ctx, t, db) @@ -594,18 +581,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) { object := metabasetest.CreateObject(ctx, t, db, obj, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeleteObjectsAllVersions{ Opts: metabase.DeleteObjectsAllVersions{ Locations: []metabase.ObjectLocation{location}, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{object}, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + Objects: []metabase.Object{object}, }, }.Check(ctx, t, db) @@ -622,18 +603,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) { object1 := metabasetest.CreateObject(ctx, t, db, obj, 1) object2 := metabasetest.CreateObject(ctx, t, db, obj2, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeleteObjectsAllVersions{ Opts: metabase.DeleteObjectsAllVersions{ Locations: []metabase.ObjectLocation{location, obj2.Location()}, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{object1, object2}, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo, expectedSegmentInfo}, + Objects: []metabase.Object{object1, object2}, }, }.Check(ctx, t, db) @@ -722,18 +697,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) { object2 := metabasetest.CreateObject(ctx, t, db, obj2, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeleteObjectsAllVersions{ Opts: metabase.DeleteObjectsAllVersions{ Locations: []metabase.ObjectLocation{location, object2.Location()}, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{object1, object2}, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + Objects: []metabase.Object{object1, object2}, }, }.Check(ctx, t, db) @@ -751,10 +720,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) { obj.StreamID = testrand.UUID() obj.Version = metabase.Version(i) expected.Objects = append(expected.Objects, metabasetest.CreateObject(ctx, t, db, obj, 1)) - expected.Segments = append(expected.Segments, metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - }) } metabasetest.DeleteObjectsAllVersions{ @@ -769,244 +734,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) { }) } -func TestDeleteCopy(t *testing.T) { - metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { - for _, numberOfSegments := range []int{0, 1, 3} { - t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) { - t.Run("delete copy", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - originalObjStream := metabasetest.RandObjectStream() - - originalObj, originalSegments := metabasetest.CreateTestObject{ - CommitObject: &metabase.CommitObject{ - ObjectStream: originalObjStream, - EncryptedMetadata: testrand.Bytes(64), - EncryptedMetadataNonce: testrand.Nonce().Bytes(), - EncryptedMetadataEncryptedKey: testrand.Bytes(265), - }, - }.Run(ctx, t, db, originalObjStream, byte(numberOfSegments)) - - copyObj, _, copySegments := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - - var copies []metabase.RawCopy - if numberOfSegments > 0 { - copies = []metabase.RawCopy{ - { - StreamID: copyObj.StreamID, - AncestorStreamID: originalObj.StreamID, - }} - } - // check that copy went OK - metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(originalObj), - metabase.RawObject(copyObj), - }, - Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments...), - Copies: copies, - }.Check(ctx, t, db) - - metabasetest.DeleteObjectExactVersion{ - Opts: metabase.DeleteObjectExactVersion{ - ObjectLocation: copyObj.Location(), - Version: copyObj.Version, - }, - Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{copyObj}, - // no segments returned as we deleted copy - }, - }.Check(ctx, t, db) - - // Verify that we are back at the original single object - metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(originalObj), - }, - Segments: metabasetest.SegmentsToRaw(originalSegments), - }.Check(ctx, t, db) - }) - - t.Run("delete one of two copies", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - originalObjectStream := metabasetest.RandObjectStream() - - originalObj, originalSegments := metabasetest.CreateTestObject{ - CommitObject: &metabase.CommitObject{ - ObjectStream: originalObjectStream, - EncryptedMetadata: testrand.Bytes(64), - EncryptedMetadataNonce: testrand.Nonce().Bytes(), - EncryptedMetadataEncryptedKey: testrand.Bytes(265), - }, - }.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments)) - - copyObject1, _, _ := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - - metabasetest.DeleteObjectExactVersion{ - Opts: metabase.DeleteObjectExactVersion{ - ObjectLocation: copyObject1.Location(), - Version: copyObject1.Version, - }, - Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{copyObject1}, - // no segments returned as we deleted copy - }, - }.Check(ctx, t, db) - - var copies []metabase.RawCopy - if numberOfSegments > 0 { - copies = []metabase.RawCopy{ - { - StreamID: copyObject2.StreamID, - AncestorStreamID: originalObj.StreamID, - }} - } - // Verify that only one of the copies is deleted - metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(originalObj), - metabase.RawObject(copyObject2), - }, - Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments2...), - Copies: copies, - }.Check(ctx, t, db) - }) - - t.Run("delete original", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - originalObjectStream := metabasetest.RandObjectStream() - - originalObj, originalSegments := metabasetest.CreateTestObject{ - CommitObject: &metabase.CommitObject{ - ObjectStream: originalObjectStream, - EncryptedMetadata: testrand.Bytes(64), - EncryptedMetadataNonce: testrand.Nonce().Bytes(), - EncryptedMetadataEncryptedKey: testrand.Bytes(265), - }, - }.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments)) - - copyObject, _, copySegments := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - - metabasetest.DeleteObjectExactVersion{ - Opts: metabase.DeleteObjectExactVersion{ - ObjectLocation: originalObj.Location(), - Version: originalObj.Version, - }, - Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{originalObj}, - // no segments returned as we deleted ancestor - // and we moved pieces to one of copies - }, - }.Check(ctx, t, db) - - for i := range copySegments { - copySegments[i].Pieces = originalSegments[i].Pieces - } - - // verify that the copy is left - metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(copyObject), - }, - Segments: copySegments, - }.Check(ctx, t, db) - }) - - t.Run("delete original and leave two copies", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - originalObjectStream := metabasetest.RandObjectStream() - - originalObj, originalSegments := metabasetest.CreateTestObject{ - CommitObject: &metabase.CommitObject{ - ObjectStream: originalObjectStream, - EncryptedMetadata: testrand.Bytes(64), - EncryptedMetadataNonce: testrand.Nonce().Bytes(), - EncryptedMetadataEncryptedKey: testrand.Bytes(265), - }, - }.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments)) - - copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{ - OriginalObject: originalObj, - }.Run(ctx, t, db, false) - - _, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{ - Version: originalObj.Version, - ObjectLocation: originalObj.Location(), - }) - require.NoError(t, err) - - var expectedAncestorStreamID uuid.UUID - var expectedCopyStreamID uuid.UUID - - var copies []metabase.RawCopy - - if numberOfSegments > 0 { - segments, err := db.TestingAllSegments(ctx) - require.NoError(t, err) - require.NotEmpty(t, segments) - - if segments[0].PiecesInAncestorSegment() { - if segments[0].StreamID == copyObject1.StreamID { - expectedCopyStreamID = copyObject1.StreamID - expectedAncestorStreamID = copyObject2.StreamID - } else { - expectedCopyStreamID = copyObject2.StreamID - expectedAncestorStreamID = copyObject1.StreamID - } - - } else { - - if segments[0].StreamID == copyObject1.StreamID { - expectedCopyStreamID = copyObject2.StreamID - expectedAncestorStreamID = copyObject1.StreamID - } else { - expectedCopyStreamID = copyObject1.StreamID - expectedAncestorStreamID = copyObject2.StreamID - } - } - - copies = []metabase.RawCopy{ - { - StreamID: expectedCopyStreamID, - AncestorStreamID: expectedAncestorStreamID, - }} - } - - // set pieces in expected ancestor for verifcation - for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} { - for i := range segments { - if segments[i].StreamID == expectedAncestorStreamID { - segments[i].Pieces = originalSegments[i].Pieces - } - } - } - - // verify that two functioning copies are left and the original object is gone - metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(copyObject1), - metabase.RawObject(copyObject2), - }, - Segments: append(copySegments1, copySegments2...), - Copies: copies, - }.Check(ctx, t, db) - }) - }) - } - }) -} - func TestDeleteCopyWithDuplicateMetadata(t *testing.T) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { for _, numberOfSegments := range []int{0, 1, 3} { @@ -1043,8 +770,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) { Version: copyObj.Version, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{copyObj}, - Segments: rawSegmentsToDeletedSegmentInfo(copySegments), + Objects: []metabase.Object{copyObj}, }, }.Check(ctx, t, db) @@ -1070,7 +796,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) { }, }.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments)) - copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{ + copyObject1, _, _ := metabasetest.CreateObjectCopy{ OriginalObject: originalObj, }.Run(ctx, t, db, true) copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{ @@ -1083,8 +809,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) { Version: copyObject1.Version, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{copyObject1}, - Segments: rawSegmentsToDeletedSegmentInfo(copySegments1), + Objects: []metabase.Object{copyObject1}, }, }.Check(ctx, t, db) @@ -1121,8 +846,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) { Version: originalObj.Version, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{originalObj}, - Segments: rawSegmentsToDeletedSegmentInfo(copySegments), + Objects: []metabase.Object{originalObj}, }, }.Check(ctx, t, db) @@ -1274,18 +998,12 @@ func TestDeleteObjectLastCommitted(t *testing.T) { object := metabasetest.CreateObject(ctx, t, db, obj, 2) - expectedSegmentInfo := metabase.DeletedSegmentInfo{ - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - } - metabasetest.DeleteObjectLastCommitted{ Opts: metabase.DeleteObjectLastCommitted{ ObjectLocation: location, }, Result: metabase.DeleteObjectResult{ - Objects: []metabase.Object{object}, - Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + Objects: []metabase.Object{object}, }, }.Check(ctx, t, db) @@ -1398,12 +1116,3 @@ func TestDeleteObjectLastCommitted(t *testing.T) { }) }) } - -func rawSegmentsToDeletedSegmentInfo(segments []metabase.RawSegment) []metabase.DeletedSegmentInfo { - result := make([]metabase.DeletedSegmentInfo, len(segments)) - for i := range segments { - result[i].RootPieceID = segments[i].RootPieceID - result[i].Pieces = segments[i].Pieces - } - return result -} diff --git a/satellite/metabase/get.go b/satellite/metabase/get.go index a3b91e4fd..273a118cb 100644 --- a/satellite/metabase/get.go +++ b/satellite/metabase/get.go @@ -42,6 +42,7 @@ func (s Segment) Inline() bool { } // PiecesInAncestorSegment returns true if remote alias pieces are to be found in an ancestor segment. +// TODO we will remove this method and related to code when all metadata will be migrated to segment copies. func (s Segment) PiecesInAncestorSegment() bool { return s.EncryptedSize != 0 && len(s.InlineData) == 0 && len(s.Pieces) == 0 } diff --git a/satellite/metabase/metabasetest/common.go b/satellite/metabase/metabasetest/common.go index 064eb410f..72c24a614 100644 --- a/satellite/metabase/metabasetest/common.go +++ b/satellite/metabase/metabasetest/common.go @@ -4,7 +4,6 @@ package metabasetest import ( - "bytes" "sort" "testing" "time" @@ -92,12 +91,6 @@ func sortRawCopies(copies []metabase.RawCopy) { }) } -func sortDeletedSegments(segments []metabase.DeletedSegmentInfo) { - sort.Slice(segments, func(i, j int) bool { - return bytes.Compare(segments[i].RootPieceID[:], segments[j].RootPieceID[:]) < 0 - }) -} - func checkError(t require.TestingT, err error, errClass *errs.Class, errText string) { if errClass != nil { require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err) diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 36e571d41..ecb5d11df 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -424,9 +424,6 @@ func (step DeleteObjectExactVersion) Check(ctx *testcontext.Context, t testing.T sortObjects(result.Objects) sortObjects(step.Result.Objects) - sortDeletedSegments(result.Segments) - sortDeletedSegments(step.Result.Segments) - diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty()) require.Zero(t, diff) } @@ -447,9 +444,6 @@ func (step DeletePendingObject) Check(ctx *testcontext.Context, t testing.TB, db sortObjects(result.Objects) sortObjects(step.Result.Objects) - sortDeletedSegments(result.Segments) - sortDeletedSegments(step.Result.Segments) - diff := cmp.Diff(step.Result, result, DefaultTimeDiff()) require.Zero(t, diff) } @@ -470,9 +464,6 @@ func (step DeleteObjectsAllVersions) Check(ctx *testcontext.Context, t testing.T sortObjects(result.Objects) sortObjects(step.Result.Objects) - sortDeletedSegments(result.Segments) - sortDeletedSegments(step.Result.Segments) - diff := cmp.Diff(step.Result, result, DefaultTimeDiff()) require.Zero(t, diff) } @@ -719,8 +710,6 @@ func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing. sortObjects(result.Objects) sortObjects(step.Result.Objects) - sortDeletedSegments(result.Segments) - sortDeletedSegments(step.Result.Segments) diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty()) require.Zero(t, diff)