diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 28cd85133..2d8a75757 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -7,13 +7,16 @@ import ( "bytes" "context" "database/sql" + "errors" "sort" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/storj" + "storj.io/common/uuid" "storj.io/private/dbutil/pgutil" + "storj.io/private/dbutil/txutil" "storj.io/private/tagsql" ) @@ -81,66 +84,8 @@ func (delete *DeleteObjectsAllVersions) Verify() error { return nil } -var deleteObjectExactVersion = ` -WITH deleted_objects AS ( - DELETE FROM objects - WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - version = $4 - RETURNING - version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, - encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, - fixed_segment_size, encryption -), deleted_segments AS ( - DELETE FROM segments - WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id -) -SELECT - version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, - encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, - fixed_segment_size, encryption -FROM deleted_objects` - -var deleteObjectLastCommitted = ` -WITH deleted_objects AS ( - DELETE FROM objects - WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - version IN (SELECT version FROM objects WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - status = ` + statusCommittedUnversioned + ` AND - (expires_at IS NULL OR expires_at > now()) - ORDER BY version DESC - ) - RETURNING - version, stream_id, - created_at, expires_at, - status, segment_count, - encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption -), deleted_segments AS ( - DELETE FROM segments - WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id -) -SELECT - version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, - encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, - fixed_segment_size, encryption -FROM deleted_objects` - // DeleteObjectExactVersion deletes an exact object version. -func (db *DB) DeleteObjectExactVersion( - ctx context.Context, opts DeleteObjectExactVersion, -) (result DeleteObjectResult, err error) { +func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) { result, err = db.deleteObjectExactVersion(ctx, opts, db.db) if err != nil { return DeleteObjectResult{}, err @@ -164,7 +109,24 @@ func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa } err = withRows( - stmt.QueryContext(ctx, deleteObjectExactVersion, + stmt.QueryContext(ctx, ` + WITH deleted_objects AS ( + DELETE FROM objects + WHERE (project_id, bucket_name, object_key, version) = ($1, $2, $3, $4) + RETURNING + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption + ), deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING segments.stream_id + ) + SELECT + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption + FROM deleted_objects`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), )(func(rows tagsql.Rows) error { result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) @@ -366,11 +328,10 @@ func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt } err = stmt.QueryRowContext(ctx, ` - SELECT MAX(version) as version + SELECT COALESCE(MAX(version), 0) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) - if err != nil { return 0, Error.Wrap(err) } @@ -404,36 +365,37 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl objectKeys[i] = []byte(opts.Locations[i].ObjectKey) } + // TODO(ver): should this insert delete markers? + // Sorting the object keys just in case. // TODO: Check if this is really necessary for the SQL query. sort.Slice(objectKeys, func(i, j int) bool { return bytes.Compare(objectKeys[i], objectKeys[j]) < 0 }) err = withRows(db.db.QueryContext(ctx, ` - WITH deleted_objects AS ( - DELETE FROM objects - WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = ANY ($3) AND - status = `+statusCommittedUnversioned+` - RETURNING - project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, - status, segment_count, encrypted_metadata_nonce, encrypted_metadata, - encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, - fixed_segment_size, encryption - ), deleted_segments AS ( - DELETE FROM segments - WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id - ) - SELECT - project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, - status, segment_count, encrypted_metadata_nonce, encrypted_metadata, - encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, - fixed_segment_size, encryption - FROM deleted_objects - `, projectID, []byte(bucketName), pgutil.ByteaArray(objectKeys)))(func(rows tagsql.Rows) error { + WITH deleted_objects AS ( + DELETE FROM objects + WHERE + (project_id, bucket_name) = ($1, $2) AND + object_key = ANY ($3) AND + status <> `+statusPending+` + RETURNING + project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, + status, segment_count, encrypted_metadata_nonce, encrypted_metadata, + encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption + ), deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING segments.stream_id + ) + SELECT + project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, + status, segment_count, encrypted_metadata_nonce, encrypted_metadata, + encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption + FROM deleted_objects + `, projectID, []byte(bucketName), pgutil.ByteaArray(objectKeys)))(func(rows tagsql.Rows) error { result.Objects, err = db.scanMultipleObjectsDeletion(ctx, rows) return err }) @@ -535,10 +497,17 @@ func (db *DB) scanPendingObjectDeletion(ctx context.Context, location ObjectLoca // DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object. type DeleteObjectLastCommitted struct { ObjectLocation + + // TODO(ver): maybe replace these with an enumeration? + Versioned bool + Suspended bool } // Verify delete object last committed fields. func (obj *DeleteObjectLastCommitted) Verify() error { + if obj.Versioned && obj.Suspended { + return ErrInvalidRequest.New("versioned and suspended cannot be enabled at the same time") + } return obj.ObjectLocation.Verify() } @@ -552,8 +521,131 @@ func (db *DB) DeleteObjectLastCommitted( return DeleteObjectResult{}, err } + if opts.Suspended { + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { + // TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT + deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx) + // TODO(ver): should we return in the result as well? + if err != nil { + return Error.Wrap(err) + } + if deleted.MaxVersion == 0 { + return ErrObjectNotFound.New("unable to delete object") + } + + row := tx.QueryRowContext(ctx, ` + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + status, expires_at, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, + zombie_deletion_deadline, encryption + ) + SELECT + $1, $2, $3, $4, $5, + `+statusDeleteMarkerUnversioned+`, NULL, 0, 0, 0, 0, + NULL, 0 + RETURNING + version, + created_at + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, uuid.UUID{}) + + var marker Object + marker.ProjectID = opts.ProjectID + marker.BucketName = opts.BucketName + marker.ObjectKey = opts.ObjectKey + marker.Status = DeleteMarkerUnversioned + + err = row.Scan(&marker.Version, &marker.CreatedAt) + if err != nil { + return Error.Wrap(err) + } + + result.Objects = append(result.Objects, marker) + return nil + }) + return result, err + } + if opts.Versioned { + // Instead of deleting we insert a deletion marker. + streamID := uuid.UUID{} + row := db.db.QueryRowContext(ctx, ` + WITH check_existing_object AS ( + SELECT status + FROM objects + WHERE + (project_id, bucket_name, object_key) = ($1, $2, $3) AND + status <> `+statusPending+` + ORDER BY project_id, bucket_name, object_key, stream_id, version DESC, created_at DESC + LIMIT 1 + ), + added_object AS ( + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + status, expires_at, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, + zombie_deletion_deadline, encryption + ) + SELECT + $1, $2, $3, + coalesce(( + SELECT version + 1 + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + ORDER BY version DESC + LIMIT 1 + ), 1), + $4, + `+statusDeleteMarkerVersioned+`, NULL, 0, 0, 0, 0, + NULL, 0 + WHERE EXISTS (SELECT 1 FROM check_existing_object) + RETURNING * + ) + SELECT version, created_at FROM added_object + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, streamID) + + // TODO(ver): should this return the deleted object or the delete marker? + var deleted Object + deleted.ProjectID = opts.ProjectID + deleted.BucketName = opts.BucketName + deleted.ObjectKey = opts.ObjectKey + deleted.StreamID = streamID + deleted.Status = DeleteMarkerVersioned + + err = row.Scan(&deleted.Version, &deleted.CreatedAt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return DeleteObjectResult{}, ErrObjectNotFound.Wrap(Error.New("object does not exist")) + } + return DeleteObjectResult{}, Error.Wrap(err) + } + return DeleteObjectResult{Objects: []Object{deleted}}, nil + } + + // TODO(ver): do we need to pretend here that `expires_at` matters? + // TODO(ver): should this report an error when the object doesn't exist? err = withRows( - db.db.QueryContext(ctx, deleteObjectLastCommitted, + db.db.QueryContext(ctx, ` + WITH deleted_objects AS ( + DELETE FROM objects + WHERE + (project_id, bucket_name, object_key) = ($1, $2, $3) AND + status = `+statusCommittedUnversioned+` AND + (expires_at IS NULL OR expires_at > now()) + RETURNING + version, stream_id, + created_at, expires_at, + status, segment_count, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, + encryption + ), deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING segments.stream_id + ) + SELECT + version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, + encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, + fixed_segment_size, encryption + FROM deleted_objects`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey), )(func(rows tagsql.Rows) error { result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) diff --git a/satellite/metabase/delete_test.go b/satellite/metabase/delete_test.go index 0eea34258..ad9d2a39b 100644 --- a/satellite/metabase/delete_test.go +++ b/satellite/metabase/delete_test.go @@ -720,6 +720,177 @@ func TestDeleteObjectExactVersion(t *testing.T) { }) } +func TestDeleteObjectVersioning(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + location := obj.Location() + + t.Run("Delete non existing object version", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: location, + Versioned: true, + }, + ErrClass: &metabase.ErrObjectNotFound, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete partial object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + pending := metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: location, + Versioned: true, + }, + ErrClass: &metabase.ErrObjectNotFound, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + // Not quite sure whether this is the appropriate behavior, + // but let's leave the pending object in place and not insert a deletion marker. + metabasetest.Verify{ + Objects: []metabase.RawObject{{ + ObjectStream: obj, + CreatedAt: pending.CreatedAt, + Status: metabase.Pending, + Encryption: pending.Encryption, + + ZombieDeletionDeadline: pending.ZombieDeletionDeadline, + }}, + }.Check(ctx, t, db) + }) + + t.Run("Create a delete marker", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + committed, _ := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + }, + }.Run(ctx, t, db, obj, 0) + + marker := committed.ObjectStream + marker.StreamID = uuid.UUID{} + marker.Version = committed.Version + 1 + + now := time.Now() + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: location, + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Objects: []metabase.Object{ + { + ObjectStream: marker, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + }, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: marker, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + { + ObjectStream: obj, + CreatedAt: committed.CreatedAt, + Status: metabase.CommittedUnversioned, + Encryption: committed.Encryption, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("multiple delete markers", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + committed, _ := metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + }, + }.Run(ctx, t, db, obj, 0) + + marker := committed.ObjectStream + marker.StreamID = uuid.UUID{} + marker.Version = committed.Version + 1 + + now := time.Now() + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: location, + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Objects: []metabase.Object{ + { + ObjectStream: marker, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + }, + }, + }.Check(ctx, t, db) + + marker2 := marker + marker2.Version = marker.Version + 1 + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: location, + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Objects: []metabase.Object{ + { + ObjectStream: marker2, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + }, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: marker, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + { + ObjectStream: marker2, + CreatedAt: now, + Status: metabase.DeleteMarkerVersioned, + }, + { + ObjectStream: obj, + CreatedAt: committed.CreatedAt, + Status: metabase.CommittedUnversioned, + Encryption: committed.Encryption, + }, + }, + }.Check(ctx, t, db) + }) + }) +} + func TestDeleteObjectsAllVersions(t *testing.T) { metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() @@ -974,8 +1145,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) { }) t.Run("Delete multiple versions of the same object at once", func(t *testing.T) { - t.Skip("skip for now as there is no easy way to have different versions of the same committed object") - defer metabasetest.DeleteAll{}.Check(ctx, t, db) expected := metabase.DeleteObjectResult{} @@ -983,7 +1152,7 @@ func TestDeleteObjectsAllVersions(t *testing.T) { for i := 1; i <= 10; i++ { obj.StreamID = testrand.UUID() obj.Version = metabase.Version(i) - expected.Objects = append(expected.Objects, metabasetest.CreateObject(ctx, t, db, obj, 1)) + expected.Objects = append(expected.Objects, metabasetest.CreateObjectVersioned(ctx, t, db, obj, 1)) } metabasetest.DeleteObjectsAllVersions{ @@ -995,6 +1164,85 @@ func TestDeleteObjectsAllVersions(t *testing.T) { metabasetest.Verify{}.Check(ctx, t, db) }) + + t.Run("delete last committed unversioned with suspended", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + + obj := metabasetest.RandObjectStream() + _ = metabasetest.CreateObject(ctx, t, db, obj, 0) + + marker := metabase.Object{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: obj.Version + 1, + }, + Status: metabase.DeleteMarkerUnversioned, + CreatedAt: now, + } + + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + }, + Versioned: false, + Suspended: true, + }, + Result: metabase.DeleteObjectResult{Objects: []metabase.Object{marker}}, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(marker), + }, + }.Check(ctx, t, db) + }) + + t.Run("delete last committed versioned with suspended", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + + obj := metabasetest.RandObjectStream() + initial := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0) + + marker := metabase.Object{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: obj.Version + 1, + }, + Status: metabase.DeleteMarkerUnversioned, + CreatedAt: now, + } + + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + }, + Versioned: false, + Suspended: true, + }, + Result: metabase.DeleteObjectResult{Objects: []metabase.Object{marker}}, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(initial), + metabase.RawObject(marker), + }, + }.Check(ctx, t, db) + }) }) } diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index ea88132be..9262566a4 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -64,7 +64,7 @@ type BeginObjectExactVersion struct { } // Check runs the test. -func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.TestingT, db *metabase.DB) { +func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.TestingT, db *metabase.DB) metabase.Object { got, err := db.TestingBeginObjectExactVersion(ctx, step.Opts) checkError(t, err, step.ErrClass, step.ErrText) if step.ErrClass == nil { @@ -82,6 +82,7 @@ func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.Te } require.Equal(t, step.Opts.Encryption, got.Encryption) } + return got } // CommitObject is for testing metabase.CommitObject.