satellite/metabase: adjust DeleteObjectLastCommitted for versioning

Change-Id: I7545af21fb098867ec43ba331ea6eaf9073db4b3
This commit is contained in:
Egon Elbre 2023-10-16 15:14:10 +03:00 committed by Storj Robot
parent 0f538093af
commit aed664a78d
3 changed files with 432 additions and 91 deletions

View File

@ -7,13 +7,16 @@ import (
"bytes" "bytes"
"context" "context"
"database/sql" "database/sql"
"errors"
"sort" "sort"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/txutil"
"storj.io/private/tagsql" "storj.io/private/tagsql"
) )
@ -81,66 +84,8 @@ func (delete *DeleteObjectsAllVersions) Verify() error {
return nil return nil
} }
var deleteObjectExactVersion = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4
RETURNING
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`
var deleteObjectLastCommitted = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version IN (SELECT version FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = ` + statusCommittedUnversioned + ` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
)
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`
// DeleteObjectExactVersion deletes an exact object version. // DeleteObjectExactVersion deletes an exact object version.
func (db *DB) DeleteObjectExactVersion( func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
ctx context.Context, opts DeleteObjectExactVersion,
) (result DeleteObjectResult, err error) {
result, err = db.deleteObjectExactVersion(ctx, opts, db.db) result, err = db.deleteObjectExactVersion(ctx, opts, db.db)
if err != nil { if err != nil {
return DeleteObjectResult{}, err return DeleteObjectResult{}, err
@ -164,7 +109,24 @@ func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
} }
err = withRows( err = withRows(
stmt.QueryContext(ctx, deleteObjectExactVersion, stmt.QueryContext(ctx, `
WITH deleted_objects AS (
DELETE FROM objects
WHERE (project_id, bucket_name, object_key, version) = ($1, $2, $3, $4)
RETURNING
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version), opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error { )(func(rows tagsql.Rows) error {
result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
@ -366,11 +328,10 @@ func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt
} }
err = stmt.QueryRowContext(ctx, ` err = stmt.QueryRowContext(ctx, `
SELECT MAX(version) as version SELECT COALESCE(MAX(version), 0) as version
FROM objects FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
if err != nil { if err != nil {
return 0, Error.Wrap(err) return 0, Error.Wrap(err)
} }
@ -404,6 +365,8 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
objectKeys[i] = []byte(opts.Locations[i].ObjectKey) objectKeys[i] = []byte(opts.Locations[i].ObjectKey)
} }
// TODO(ver): should this insert delete markers?
// Sorting the object keys just in case. // Sorting the object keys just in case.
// TODO: Check if this is really necessary for the SQL query. // TODO: Check if this is really necessary for the SQL query.
sort.Slice(objectKeys, func(i, j int) bool { sort.Slice(objectKeys, func(i, j int) bool {
@ -413,10 +376,9 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
WITH deleted_objects AS ( WITH deleted_objects AS (
DELETE FROM objects DELETE FROM objects
WHERE WHERE
project_id = $1 AND (project_id, bucket_name) = ($1, $2) AND
bucket_name = $2 AND
object_key = ANY ($3) AND object_key = ANY ($3) AND
status = `+statusCommittedUnversioned+` status <> `+statusPending+`
RETURNING RETURNING
project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, project_id, bucket_name, object_key, version, stream_id, created_at, expires_at,
status, segment_count, encrypted_metadata_nonce, encrypted_metadata, status, segment_count, encrypted_metadata_nonce, encrypted_metadata,
@ -535,10 +497,17 @@ func (db *DB) scanPendingObjectDeletion(ctx context.Context, location ObjectLoca
// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object. // DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
type DeleteObjectLastCommitted struct { type DeleteObjectLastCommitted struct {
ObjectLocation ObjectLocation
// TODO(ver): maybe replace these with an enumeration?
Versioned bool
Suspended bool
} }
// Verify delete object last committed fields. // Verify delete object last committed fields.
func (obj *DeleteObjectLastCommitted) Verify() error { func (obj *DeleteObjectLastCommitted) Verify() error {
if obj.Versioned && obj.Suspended {
return ErrInvalidRequest.New("versioned and suspended cannot be enabled at the same time")
}
return obj.ObjectLocation.Verify() return obj.ObjectLocation.Verify()
} }
@ -552,8 +521,131 @@ func (db *DB) DeleteObjectLastCommitted(
return DeleteObjectResult{}, err return DeleteObjectResult{}, err
} }
if opts.Suspended {
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT
deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx)
// TODO(ver): should we return in the result as well?
if err != nil {
return Error.Wrap(err)
}
if deleted.MaxVersion == 0 {
return ErrObjectNotFound.New("unable to delete object")
}
row := tx.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
status, expires_at, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size,
zombie_deletion_deadline, encryption
)
SELECT
$1, $2, $3, $4, $5,
`+statusDeleteMarkerUnversioned+`, NULL, 0, 0, 0, 0,
NULL, 0
RETURNING
version,
created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, uuid.UUID{})
var marker Object
marker.ProjectID = opts.ProjectID
marker.BucketName = opts.BucketName
marker.ObjectKey = opts.ObjectKey
marker.Status = DeleteMarkerUnversioned
err = row.Scan(&marker.Version, &marker.CreatedAt)
if err != nil {
return Error.Wrap(err)
}
result.Objects = append(result.Objects, marker)
return nil
})
return result, err
}
if opts.Versioned {
// Instead of deleting we insert a deletion marker.
streamID := uuid.UUID{}
row := db.db.QueryRowContext(ctx, `
WITH check_existing_object AS (
SELECT status
FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status <> `+statusPending+`
ORDER BY project_id, bucket_name, object_key, stream_id, version DESC, created_at DESC
LIMIT 1
),
added_object AS (
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
status, expires_at, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size,
zombie_deletion_deadline, encryption
)
SELECT
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), 1),
$4,
`+statusDeleteMarkerVersioned+`, NULL, 0, 0, 0, 0,
NULL, 0
WHERE EXISTS (SELECT 1 FROM check_existing_object)
RETURNING *
)
SELECT version, created_at FROM added_object
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, streamID)
// TODO(ver): should this return the deleted object or the delete marker?
var deleted Object
deleted.ProjectID = opts.ProjectID
deleted.BucketName = opts.BucketName
deleted.ObjectKey = opts.ObjectKey
deleted.StreamID = streamID
deleted.Status = DeleteMarkerVersioned
err = row.Scan(&deleted.Version, &deleted.CreatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return DeleteObjectResult{}, ErrObjectNotFound.Wrap(Error.New("object does not exist"))
}
return DeleteObjectResult{}, Error.Wrap(err)
}
return DeleteObjectResult{Objects: []Object{deleted}}, nil
}
// TODO(ver): do we need to pretend here that `expires_at` matters?
// TODO(ver): should this report an error when the object doesn't exist?
err = withRows( err = withRows(
db.db.QueryContext(ctx, deleteObjectLastCommitted, db.db.QueryContext(ctx, `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status = `+statusCommittedUnversioned+` AND
(expires_at IS NULL OR expires_at > now())
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey), opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
)(func(rows tagsql.Rows) error { )(func(rows tagsql.Rows) error {
result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)

View File

@ -720,6 +720,177 @@ func TestDeleteObjectExactVersion(t *testing.T) {
}) })
} }
func TestDeleteObjectVersioning(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
location := obj.Location()
t.Run("Delete non existing object version", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
ErrClass: &metabase.ErrObjectNotFound,
Result: metabase.DeleteObjectResult{},
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("Delete partial object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
}.Check(ctx, t, db)
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
ErrClass: &metabase.ErrObjectNotFound,
Result: metabase.DeleteObjectResult{},
}.Check(ctx, t, db)
// Not quite sure whether this is the appropriate behavior,
// but let's leave the pending object in place and not insert a deletion marker.
metabasetest.Verify{
Objects: []metabase.RawObject{{
ObjectStream: obj,
CreatedAt: pending.CreatedAt,
Status: metabase.Pending,
Encryption: pending.Encryption,
ZombieDeletionDeadline: pending.ZombieDeletionDeadline,
}},
}.Check(ctx, t, db)
})
t.Run("Create a delete marker", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
committed, _ := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: obj,
},
}.Run(ctx, t, db, obj, 0)
marker := committed.ObjectStream
marker.StreamID = uuid.UUID{}
marker.Version = committed.Version + 1
now := time.Now()
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{
{
ObjectStream: marker,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
},
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: marker,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
{
ObjectStream: obj,
CreatedAt: committed.CreatedAt,
Status: metabase.CommittedUnversioned,
Encryption: committed.Encryption,
},
},
}.Check(ctx, t, db)
})
t.Run("multiple delete markers", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
committed, _ := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: obj,
},
}.Run(ctx, t, db, obj, 0)
marker := committed.ObjectStream
marker.StreamID = uuid.UUID{}
marker.Version = committed.Version + 1
now := time.Now()
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{
{
ObjectStream: marker,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
},
},
}.Check(ctx, t, db)
marker2 := marker
marker2.Version = marker.Version + 1
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{
{
ObjectStream: marker2,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
},
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: marker,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
{
ObjectStream: marker2,
CreatedAt: now,
Status: metabase.DeleteMarkerVersioned,
},
{
ObjectStream: obj,
CreatedAt: committed.CreatedAt,
Status: metabase.CommittedUnversioned,
Encryption: committed.Encryption,
},
},
}.Check(ctx, t, db)
})
})
}
func TestDeleteObjectsAllVersions(t *testing.T) { func TestDeleteObjectsAllVersions(t *testing.T) {
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream() obj := metabasetest.RandObjectStream()
@ -974,8 +1145,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
}) })
t.Run("Delete multiple versions of the same object at once", func(t *testing.T) { t.Run("Delete multiple versions of the same object at once", func(t *testing.T) {
t.Skip("skip for now as there is no easy way to have different versions of the same committed object")
defer metabasetest.DeleteAll{}.Check(ctx, t, db) defer metabasetest.DeleteAll{}.Check(ctx, t, db)
expected := metabase.DeleteObjectResult{} expected := metabase.DeleteObjectResult{}
@ -983,7 +1152,7 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
obj.StreamID = testrand.UUID() obj.StreamID = testrand.UUID()
obj.Version = metabase.Version(i) obj.Version = metabase.Version(i)
expected.Objects = append(expected.Objects, metabasetest.CreateObject(ctx, t, db, obj, 1)) expected.Objects = append(expected.Objects, metabasetest.CreateObjectVersioned(ctx, t, db, obj, 1))
} }
metabasetest.DeleteObjectsAllVersions{ metabasetest.DeleteObjectsAllVersions{
@ -995,6 +1164,85 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
metabasetest.Verify{}.Check(ctx, t, db) metabasetest.Verify{}.Check(ctx, t, db)
}) })
t.Run("delete last committed unversioned with suspended", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
obj := metabasetest.RandObjectStream()
_ = metabasetest.CreateObject(ctx, t, db, obj, 0)
marker := metabase.Object{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: obj.Version + 1,
},
Status: metabase.DeleteMarkerUnversioned,
CreatedAt: now,
}
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: metabase.ObjectLocation{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
},
Versioned: false,
Suspended: true,
},
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{marker}},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(marker),
},
}.Check(ctx, t, db)
})
t.Run("delete last committed versioned with suspended", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
obj := metabasetest.RandObjectStream()
initial := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)
marker := metabase.Object{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: obj.Version + 1,
},
Status: metabase.DeleteMarkerUnversioned,
CreatedAt: now,
}
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: metabase.ObjectLocation{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
},
Versioned: false,
Suspended: true,
},
Result: metabase.DeleteObjectResult{Objects: []metabase.Object{marker}},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(initial),
metabase.RawObject(marker),
},
}.Check(ctx, t, db)
})
}) })
} }

View File

@ -64,7 +64,7 @@ type BeginObjectExactVersion struct {
} }
// Check runs the test. // Check runs the test.
func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.TestingT, db *metabase.DB) { func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.TestingT, db *metabase.DB) metabase.Object {
got, err := db.TestingBeginObjectExactVersion(ctx, step.Opts) got, err := db.TestingBeginObjectExactVersion(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText) checkError(t, err, step.ErrClass, step.ErrText)
if step.ErrClass == nil { if step.ErrClass == nil {
@ -82,6 +82,7 @@ func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.Te
} }
require.Equal(t, step.Opts.Encryption, got.Encryption) require.Equal(t, step.Opts.Encryption, got.Encryption)
} }
return got
} }
// CommitObject is for testing metabase.CommitObject. // CommitObject is for testing metabase.CommitObject.