diff --git a/satellite/metabase/delete_bucket.go b/satellite/metabase/delete_bucket.go index aaab098e8..1f9b8c002 100644 --- a/satellite/metabase/delete_bucket.go +++ b/satellite/metabase/delete_bucket.go @@ -7,9 +7,14 @@ import ( "context" "database/sql" "errors" + "fmt" + "github.com/zeebo/errs" + + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/dbutil" + "storj.io/private/dbutil/txutil" "storj.io/private/tagsql" ) @@ -27,6 +32,103 @@ type DeleteBucketObjects struct { DeletePieces func(ctx context.Context, segments []DeletedSegmentInfo) error } +var deleteObjectsCockroachSubSQL = ` +DELETE FROM objects +WHERE project_id = $1 AND bucket_name = $2 +LIMIT $3 +RETURNING objects.stream_id +` + +// postgres does not support LIMIT in DELETE. +var deleteObjectsPostgresSubSQL = ` +DELETE FROM objects +WHERE (objects.project_id, objects.bucket_name) IN ( + SELECT project_id, bucket_name FROM objects + WHERE project_id = $1 AND bucket_name = $2 + LIMIT $3 +) +RETURNING objects.stream_id` + +// TODO: remove comments with regex. +// TODO: align/merge with metabase/delete.go. +var deleteBucketObjectsWithCopyFeatureSQL = ` +WITH deleted_objects AS ( + %s +), +deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING + segments.stream_id, + segments.position, + segments.inline_data, + segments.plain_size, + segments.encrypted_size, + segments.repaired_at, + segments.root_piece_id, + segments.remote_alias_pieces +), +deleted_copies AS ( + DELETE FROM segment_copies + WHERE segment_copies.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING segment_copies.stream_id +), +-- lowest stream_id becomes new ancestor +promoted_ancestors AS ( + SELECT + min(segment_copies.stream_id::text)::bytea AS new_ancestor_stream_id, + segment_copies.ancestor_stream_id AS deleted_stream_id + FROM segment_copies + -- select children about to lose their ancestor + WHERE segment_copies.ancestor_stream_id IN ( + SELECT stream_id + FROM deleted_objects + ORDER BY stream_id + ) + -- don't select children which will be removed themselves + AND segment_copies.stream_id NOT IN ( + SELECT stream_id + FROM deleted_objects + ) + -- select only one child to promote per ancestor + GROUP BY segment_copies.ancestor_stream_id +) +SELECT + deleted_objects.stream_id, + deleted_segments.position, + deleted_segments.root_piece_id, + -- piece to remove from storagenodes or link to new ancestor + deleted_segments.remote_alias_pieces, + -- if set, caller needs to promote this stream_id to new ancestor or else object contents will be lost + promoted_ancestors.new_ancestor_stream_id +FROM deleted_objects +LEFT JOIN deleted_segments + ON deleted_objects.stream_id = deleted_segments.stream_id +LEFT JOIN promoted_ancestors + ON deleted_objects.stream_id = promoted_ancestors.deleted_stream_id +ORDER BY stream_id +` + +var deleteBucketObjectsWithCopyFeaturePostgresSQL = fmt.Sprintf( + deleteBucketObjectsWithCopyFeatureSQL, + deleteObjectsPostgresSubSQL, +) +var deleteBucketObjectsWithCopyFeatureCockroachSQL = fmt.Sprintf( + deleteBucketObjectsWithCopyFeatureSQL, + deleteObjectsCockroachSubSQL, +) + +func getDeleteBucketObjectsSQLWithCopyFeature(impl dbutil.Implementation) (string, error) { + switch impl { + case dbutil.Cockroach: + return deleteBucketObjectsWithCopyFeatureCockroachSQL, nil + case dbutil.Postgres: + return deleteBucketObjectsWithCopyFeaturePostgresSQL, nil + default: + return "", Error.New("unhandled database: %v", impl) + } +} + // DeleteBucketObjects deletes all objects in the specified bucket. // Deletion performs in batches, so in case of error while processing, // this method will return the number of objects deleted to the moment @@ -40,7 +142,124 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) deleteBatchSizeLimit.Ensure(&opts.BatchSize) + if db.config.ServerSideCopy { + return db.deleteBucketObjectsWithCopyFeatureEnabled(ctx, opts) + } + + return db.deleteBucketObjectsWithCopyFeatureDisabled(ctx, opts) +} + +func (db *DB) deleteBucketObjectsWithCopyFeatureEnabled(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) { + defer mon.Task()(&ctx)(&err) + query, err := getDeleteBucketObjectsSQLWithCopyFeature(db.impl) + if err != nil { + return deletedObjectCount, err + } + + for { + if err := ctx.Err(); err != nil { + return deletedObjectCount, err + } + + objects := []deletedObjectInfo{} + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { + err = withRows( + tx.QueryContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize), + )(func(rows tagsql.Rows) error { + objects, err = db.scanBucketObjectsDeletionServerSideCopy(ctx, opts.Bucket, rows) + return err + }) + if err != nil { + return err + } + + return db.promoteNewAncestors(ctx, tx, objects) + }) + + deletedObjectCount += int64(len(objects)) + + if err != nil || len(objects) == 0 { + return deletedObjectCount, err + } + + if opts.DeletePieces == nil { + // no callback, should only be in test path + continue + } + + for _, object := range objects { + if object.PromotedAncestor != nil { + // don't remove pieces, they are now linked to the new ancestor + continue + } + for _, segment := range object.Segments { + // Is there an advantage to batching this? + err := opts.DeletePieces(ctx, []DeletedSegmentInfo{ + { + RootPieceID: segment.RootPieceID, + Pieces: segment.Pieces, + }, + }) + if err != nil { + return deletedObjectCount, err + } + } + } + } +} + +func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, location BucketLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) { + defer mon.Task()(&ctx)(&err) + defer func() { err = errs.Combine(err, rows.Close()) }() + + result = make([]deletedObjectInfo, 0, 10) + var rootPieceID *storj.PieceID + var object deletedObjectInfo + var segment deletedRemoteSegmentInfo + var aliasPieces AliasPieces + var position *SegmentPosition + + for rows.Next() { + object.ProjectID = location.ProjectID + object.BucketName = location.BucketName + + err = rows.Scan( + &object.StreamID, + &position, + &rootPieceID, + &aliasPieces, + &object.PromotedAncestor, + ) + if err != nil { + return nil, Error.New("unable to delete bucket objects: %w", err) + } + + if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID { + result = append(result, object) + } + if rootPieceID != nil { + segment.Position = *position + segment.RootPieceID = *rootPieceID + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return nil, Error.Wrap(err) + } + if len(segment.Pieces) > 0 { + result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment) + } + } + } + if err := rows.Err(); err != nil { + return nil, Error.New("unable to delete object: %w", err) + } + return result, nil +} + +func (db *DB) deleteBucketObjectsWithCopyFeatureDisabled(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) { + defer mon.Task()(&ctx)(&err) + var query string + switch db.impl { case dbutil.Cockroach: query = ` diff --git a/satellite/metabase/delete_bucket_test.go b/satellite/metabase/delete_bucket_test.go index 58bb703b6..66398cbd5 100644 --- a/satellite/metabase/delete_bucket_test.go +++ b/satellite/metabase/delete_bucket_test.go @@ -6,6 +6,7 @@ package metabase_test import ( "context" "errors" + "fmt" "testing" "time" @@ -13,6 +14,7 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" + "storj.io/common/testrand" "storj.io/common/uuid" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/metabasetest" @@ -80,13 +82,13 @@ func TestDeleteBucketObjects(t *testing.T) { metabasetest.CreateObject(ctx, t, db, obj1, 2) + nSegments := 0 metabasetest.DeleteBucketObjects{ Opts: metabase.DeleteBucketObjects{ Bucket: obj1.Location().Bucket(), DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { - if len(segments) != 2 { - return errors.New("expected 2 segments") - } + nSegments += len(segments) + for _, s := range segments { if len(s.Pieces) != 1 { return errors.New("expected 1 piece per segment") @@ -98,6 +100,8 @@ func TestDeleteBucketObjects(t *testing.T) { Deleted: 1, }.Check(ctx, t, db) + require.Equal(t, 2, nSegments) + metabasetest.Verify{}.Check(ctx, t, db) }) @@ -113,9 +117,7 @@ func TestDeleteBucketObjects(t *testing.T) { return errors.New("expected no segments") }, }, - // TODO: fix the count for objects without segments - // this should be 1. - Deleted: 0, + Deleted: 1, }.Check(ctx, t, db) metabasetest.Verify{}.Check(ctx, t, db) @@ -128,14 +130,14 @@ func TestDeleteBucketObjects(t *testing.T) { metabasetest.CreateObject(ctx, t, db, obj2, 2) metabasetest.CreateObject(ctx, t, db, obj3, 2) + nSegments := 0 metabasetest.DeleteBucketObjects{ Opts: metabase.DeleteBucketObjects{ Bucket: obj1.Location().Bucket(), BatchSize: 2, DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { - if len(segments) != 2 && len(segments) != 4 { - return errors.New("expected 2 or 4 segments") - } + nSegments += len(segments) + for _, s := range segments { if len(s.Pieces) != 1 { return errors.New("expected 1 piece per segment") @@ -147,6 +149,8 @@ func TestDeleteBucketObjects(t *testing.T) { Deleted: 3, }.Check(ctx, t, db) + require.Equal(t, 6, nSegments) + metabasetest.Verify{}.Check(ctx, t, db) }) @@ -234,20 +238,22 @@ func TestDeleteBucketObjects(t *testing.T) { metabasetest.CreateObject(ctx, t, db, obj1, 37) + nSegments := 0 metabasetest.DeleteBucketObjects{ Opts: metabase.DeleteBucketObjects{ Bucket: obj1.Location().Bucket(), BatchSize: 2, DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { - if len(segments) != 37 { - return errors.New("expected 37 segments") - } + nSegments += len(segments) + return nil }, }, Deleted: 1, }.Check(ctx, t, db) + require.Equal(t, 37, nSegments) + metabasetest.Verify{}.Check(ctx, t, db) }) @@ -341,3 +347,180 @@ func TestDeleteBucketObjectsCancel(t *testing.T) { }.Check(ctx, t, db) }) } + +func TestDeleteBucketWithCopies(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + for _, numberOfSegments := range []int{0, 1, 3} { + t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) { + t.Run("delete bucket with copy", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + originalObjStream := metabasetest.RandObjectStream() + originalObjStream.BucketName = "original-bucket" + + originalObj, originalSegments := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: originalObjStream, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, originalObjStream, byte(numberOfSegments)) + + copyObjectStream := metabasetest.RandObjectStream() + copyObjectStream.ProjectID = originalObjStream.ProjectID + copyObjectStream.BucketName = "copy-bucket" + + metabasetest.CreateObjectCopy{ + OriginalObject: originalObj, + CopyObjectStream: ©ObjectStream, + }.Run(ctx, t, db) + + _, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{ + Bucket: metabase.BucketLocation{ + ProjectID: originalObjStream.ProjectID, + BucketName: "copy-bucket", + }, + BatchSize: 2, + DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { + return nil + }, + }) + require.NoError(t, err) + + // Verify that we are back at the original single object + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(originalObj), + }, + Segments: metabasetest.SegmentsToRaw(originalSegments), + }.Check(ctx, t, db) + }) + + t.Run("delete bucket with ancestor", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + originalObjStream := metabasetest.RandObjectStream() + originalObjStream.BucketName = "original-bucket" + + originalObj, originalSegments := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: originalObjStream, + EncryptedMetadata: testrand.Bytes(64), + EncryptedMetadataNonce: testrand.Nonce().Bytes(), + EncryptedMetadataEncryptedKey: testrand.Bytes(265), + }, + }.Run(ctx, t, db, originalObjStream, byte(numberOfSegments)) + + copyObjectStream := metabasetest.RandObjectStream() + copyObjectStream.ProjectID = originalObjStream.ProjectID + copyObjectStream.BucketName = "copy-bucket" + + copyObj, _, copySegments := metabasetest.CreateObjectCopy{ + OriginalObject: originalObj, + CopyObjectStream: ©ObjectStream, + }.Run(ctx, t, db) + + _, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{ + Bucket: metabase.BucketLocation{ + ProjectID: originalObjStream.ProjectID, + BucketName: "original-bucket", + }, + BatchSize: 2, + DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { + return nil + }, + }) + require.NoError(t, err) + + for i := range copySegments { + copySegments[i].Pieces = originalSegments[i].Pieces + } + + // Verify that we are back at the original single object + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(copyObj), + }, + Segments: copySegments, + }.Check(ctx, t, db) + }) + + t.Run("delete bucket which has one ancestor and one copy", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + originalObjStream1 := metabasetest.RandObjectStream() + originalObjStream1.BucketName = "bucket1" + + projectID := originalObjStream1.ProjectID + + originalObjStream2 := metabasetest.RandObjectStream() + originalObjStream2.ProjectID = projectID + originalObjStream2.BucketName = "bucket2" + + originalObj1, originalSegments1 := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: originalObjStream1, + }, + }.Run(ctx, t, db, originalObjStream1, byte(numberOfSegments)) + + originalObj2, originalSegments2 := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: originalObjStream2, + }, + }.Run(ctx, t, db, originalObjStream2, byte(numberOfSegments)) + + copyObjectStream1 := metabasetest.RandObjectStream() + copyObjectStream1.ProjectID = projectID + copyObjectStream1.BucketName = "bucket2" // copy from bucket 1 to bucket 2 + + copyObjectStream2 := metabasetest.RandObjectStream() + copyObjectStream2.ProjectID = projectID + copyObjectStream2.BucketName = "bucket1" // copy from bucket 2 to bucket 1 + + metabasetest.CreateObjectCopy{ + OriginalObject: originalObj1, + CopyObjectStream: ©ObjectStream1, + }.Run(ctx, t, db) + + copyObj2, _, copySegments2 := metabasetest.CreateObjectCopy{ + OriginalObject: originalObj2, + CopyObjectStream: ©ObjectStream2, + }.Run(ctx, t, db) + + // done preparing, delete bucket 1 + _, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{ + Bucket: metabase.BucketLocation{ + ProjectID: projectID, + BucketName: "bucket2", + }, + BatchSize: 2, + DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { + return nil + }, + }) + require.NoError(t, err) + + // Prepare for check. + // obj1 is the same as before, copyObj2 should now be the original + for i := range copySegments2 { + copySegments2[i].Pieces = originalSegments2[i].Pieces + } + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(originalObj1), + metabase.RawObject(copyObj2), + }, + Segments: append(copySegments2, metabasetest.SegmentsToRaw(originalSegments1)...), + }.Check(ctx, t, db) + }) + + // TODO: check that DeletePieces callback is called with the correct arguments + + // scenario: delete original bucket with 2 copies + + // scenario: delete copy bucket with 2 copies + + // scenario: delete bucket with 2 internal copies + }) + } + }) +} diff --git a/satellite/metabase/delete_test.go b/satellite/metabase/delete_test.go index 1ab6cf7fa..621c4d015 100644 --- a/satellite/metabase/delete_test.go +++ b/satellite/metabase/delete_test.go @@ -1142,6 +1142,7 @@ func TestDeleteCopy(t *testing.T) { AncestorStreamID: remainingStreamIDs[0], }} } + expectedAncestorStreamID := remainingStreamIDs[0] // set pieces in expected ancestor for verifcation