satellite/metabase: don't create delete markers for pending

Fix a case where it was possible to create a delete marker when only
pending object was present.

The solution is not pretty, but we have a TODO note to cleanup the
precommit code, so let's fix the bug first.

Change-Id: I0ab66d745443c9dccbf29ef32389dd912b2d9caf
This commit is contained in:
Egon Elbre 2023-11-22 14:00:22 +02:00
parent adcd810e37
commit 89dad05c65
3 changed files with 179 additions and 4 deletions

View File

@ -364,14 +364,13 @@ func (db *DB) DeleteObjectLastCommitted(
return DeleteObjectResult{}, Error.Wrap(err)
}
var precommit precommitConstraintResult
var precommit precommitConstraintWithNonPendingResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
precommit, err = db.precommitDeleteUnversioned(ctx, opts.ObjectLocation, tx)
precommit, err = db.precommitDeleteUnversionedWithNonPending(ctx, opts.ObjectLocation, tx)
if err != nil {
return Error.Wrap(err)
}
// TODO(ver): currently it allows adding delete markers when pending objects are present.
if precommit.HighestVersion == 0 {
if precommit.HighestVersion == 0 || precommit.HighestNonPendingVersion == 0 {
// an object didn't exist in the first place
return ErrObjectNotFound.New("unable to delete object")
}

View File

@ -704,6 +704,38 @@ func TestDeleteObjectVersioning(t *testing.T) {
},
}.Check(ctx, t, db)
})
t.Run("delete last pending with suspended", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj := metabasetest.RandObjectStream()
pending := metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
}.Check(ctx, t, db)
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: metabase.ObjectLocation{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
},
Versioned: false,
Suspended: true,
},
ErrClass: &metabase.ErrObjectNotFound,
ErrText: "unable to delete object",
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(pending),
},
}.Check(ctx, t, db)
})
})
}

View File

@ -238,3 +238,147 @@ func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation
return result, nil
}
type precommitConstraintWithNonPendingResult struct {
Deleted []Object
// DeletedObjectCount returns how many objects were deleted.
DeletedObjectCount int
// DeletedSegmentCount returns how many segments were deleted.
DeletedSegmentCount int
// HighestVersion returns tha highest version that was present in the table.
// It returns 0 if there was none.
HighestVersion Version
// HighestNonPendingVersion returns tha highest non-pending version that was present in the table.
// It returns 0 if there was none.
HighestNonPendingVersion Version
}
func (r *precommitConstraintWithNonPendingResult) submitMetrics() {
mon.Meter("object_delete").Mark(r.DeletedObjectCount)
mon.Meter("segment_delete").Mark(r.DeletedSegmentCount)
}
// precommitDeleteUnversionedWithNonPending deletes the unversioned object at loc and also returns the highest version and highest committed version.
func (db *DB) precommitDeleteUnversionedWithNonPending(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintWithNonPendingResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return precommitConstraintWithNonPendingResult{}, Error.Wrap(err)
}
var deleted Object
// TODO(ver): this scanning can probably simplified somehow.
var version sql.NullInt64
var streamID uuid.NullUUID
var createdAt sql.NullTime
var segmentCount, fixedSegmentSize sql.NullInt32
var totalPlainSize, totalEncryptedSize sql.NullInt64
var status sql.NullByte
var encryptionParams nullableValue[encryptionParameters]
encryptionParams.value.EncryptionParameters = &deleted.Encryption
err = tx.QueryRowContext(ctx, `
WITH highest_object AS (
SELECT MAX(version) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
), highest_non_pending_object AS (
SELECT MAX(version) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
AND status <> `+statusPending+`
), deleted_objects AS (
DELETE FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3)
AND status IN `+statusesUnversioned+`
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT
(SELECT version FROM deleted_objects),
(SELECT stream_id FROM deleted_objects),
(SELECT created_at FROM deleted_objects),
(SELECT expires_at FROM deleted_objects),
(SELECT status FROM deleted_objects),
(SELECT segment_count FROM deleted_objects),
(SELECT encrypted_metadata_nonce FROM deleted_objects),
(SELECT encrypted_metadata FROM deleted_objects),
(SELECT encrypted_metadata_encrypted_key FROM deleted_objects),
(SELECT total_plain_size FROM deleted_objects),
(SELECT total_encrypted_size FROM deleted_objects),
(SELECT fixed_segment_size FROM deleted_objects),
(SELECT encryption FROM deleted_objects),
(SELECT count(*) FROM deleted_objects),
(SELECT count(*) FROM deleted_segments),
coalesce((SELECT version FROM highest_object), 0),
coalesce((SELECT version FROM highest_non_pending_object), 0)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).
Scan(
&version,
&streamID,
&createdAt,
&deleted.ExpiresAt,
&status,
&segmentCount,
&deleted.EncryptedMetadataNonce,
&deleted.EncryptedMetadata,
&deleted.EncryptedMetadataEncryptedKey,
&totalPlainSize,
&totalEncryptedSize,
&fixedSegmentSize,
&encryptionParams,
&result.DeletedObjectCount,
&result.DeletedSegmentCount,
&result.HighestVersion,
&result.HighestNonPendingVersion,
)
if err != nil {
return precommitConstraintWithNonPendingResult{}, Error.Wrap(err)
}
deleted.ProjectID = loc.ProjectID
deleted.BucketName = loc.BucketName
deleted.ObjectKey = loc.ObjectKey
deleted.Version = Version(version.Int64)
deleted.Status = ObjectStatus(status.Byte)
deleted.StreamID = streamID.UUID
deleted.CreatedAt = createdAt.Time
deleted.SegmentCount = segmentCount.Int32
deleted.TotalPlainSize = totalPlainSize.Int64
deleted.TotalEncryptedSize = totalEncryptedSize.Int64
deleted.FixedSegmentSize = fixedSegmentSize.Int32
if result.DeletedObjectCount > 1 {
db.log.Error("object with multiple committed versions were found!",
zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName),
zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount))
mon.Meter("multiple_committed_versions").Mark(1)
return result, Error.New("internal error: multiple committed unversioned objects")
}
if result.DeletedObjectCount > 0 {
result.Deleted = append(result.Deleted, deleted)
}
return result, nil
}