satellite/metabase: move delete object unversioned code
Change-Id: Iec92c91c174b990fa44b210db415da7a4eb650c4
This commit is contained in:
parent
46f7e5b7fa
commit
e5e55ef266
@ -11,7 +11,6 @@ import (
|
||||
"sort"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
@ -99,9 +98,6 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
type stmt interface {
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (tagsql.Rows, error)
|
||||
}
|
||||
type stmtRow interface {
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package.
|
||||
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, stmt stmt) (result DeleteObjectResult, err error) {
|
||||
@ -250,159 +246,6 @@ func (db *DB) DeletePendingObjectNew(ctx context.Context, opts DeletePendingObje
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type deleteObjectUnversionedCommittedResult struct {
|
||||
Deleted []Object
|
||||
// DeletedObjectCount and DeletedSegmentCount return how many elements were deleted.
|
||||
DeletedObjectCount int
|
||||
DeletedSegmentCount int
|
||||
// MaxVersion returns tha highest version that was present in the table.
|
||||
// It returns 0 if there was none.
|
||||
MaxVersion Version
|
||||
}
|
||||
|
||||
// deleteObjectUnversionedCommitted deletes the unversioned object at the specified location inside a transaction.
|
||||
//
|
||||
// TODO(ver): this should have a better and clearer name.
|
||||
func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLocation, stmt stmtRow) (result deleteObjectUnversionedCommittedResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := loc.Verify(); err != nil {
|
||||
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
var deleted Object
|
||||
|
||||
// TODO(ver): this scanning can probably simplified somehow.
|
||||
|
||||
var version sql.NullInt64
|
||||
var streamID uuid.NullUUID
|
||||
var createdAt sql.NullTime
|
||||
var segmentCount, fixedSegmentSize sql.NullInt32
|
||||
var totalPlainSize, totalEncryptedSize sql.NullInt64
|
||||
var status sql.NullByte
|
||||
var encryptionParams nullableValue[encryptionParameters]
|
||||
encryptionParams.value.EncryptionParameters = &deleted.Encryption
|
||||
|
||||
err = stmt.QueryRowContext(ctx, `
|
||||
WITH highest_object AS (
|
||||
SELECT MAX(version) as version
|
||||
FROM objects
|
||||
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
), deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
AND status IN `+statusesUnversioned+`
|
||||
RETURNING
|
||||
version, stream_id,
|
||||
created_at, expires_at,
|
||||
status, segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
SELECT
|
||||
(SELECT version FROM deleted_objects),
|
||||
(SELECT stream_id FROM deleted_objects),
|
||||
(SELECT created_at FROM deleted_objects),
|
||||
(SELECT expires_at FROM deleted_objects),
|
||||
(SELECT status FROM deleted_objects),
|
||||
(SELECT segment_count FROM deleted_objects),
|
||||
(SELECT encrypted_metadata_nonce FROM deleted_objects),
|
||||
(SELECT encrypted_metadata FROM deleted_objects),
|
||||
(SELECT encrypted_metadata_encrypted_key FROM deleted_objects),
|
||||
(SELECT total_plain_size FROM deleted_objects),
|
||||
(SELECT total_encrypted_size FROM deleted_objects),
|
||||
(SELECT fixed_segment_size FROM deleted_objects),
|
||||
(SELECT encryption FROM deleted_objects),
|
||||
(SELECT count(*) FROM deleted_objects),
|
||||
(SELECT count(*) FROM deleted_segments),
|
||||
coalesce((SELECT version FROM highest_object), 0)
|
||||
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).
|
||||
Scan(
|
||||
&version,
|
||||
&streamID,
|
||||
&createdAt,
|
||||
&deleted.ExpiresAt,
|
||||
&status,
|
||||
&segmentCount,
|
||||
&deleted.EncryptedMetadataNonce,
|
||||
&deleted.EncryptedMetadata,
|
||||
&deleted.EncryptedMetadataEncryptedKey,
|
||||
&totalPlainSize,
|
||||
&totalEncryptedSize,
|
||||
&fixedSegmentSize,
|
||||
&encryptionParams,
|
||||
&result.DeletedObjectCount,
|
||||
&result.DeletedSegmentCount,
|
||||
&result.MaxVersion,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
deleted.ProjectID = loc.ProjectID
|
||||
deleted.BucketName = loc.BucketName
|
||||
deleted.ObjectKey = loc.ObjectKey
|
||||
deleted.Version = Version(version.Int64)
|
||||
|
||||
deleted.Status = ObjectStatus(status.Byte)
|
||||
deleted.StreamID = streamID.UUID
|
||||
deleted.CreatedAt = createdAt.Time
|
||||
deleted.SegmentCount = segmentCount.Int32
|
||||
|
||||
deleted.TotalPlainSize = totalPlainSize.Int64
|
||||
deleted.TotalEncryptedSize = totalEncryptedSize.Int64
|
||||
deleted.FixedSegmentSize = fixedSegmentSize.Int32
|
||||
|
||||
// TODO: this should happen outside of this func
|
||||
mon.Meter("object_delete").Mark(result.DeletedObjectCount)
|
||||
mon.Meter("segment_delete").Mark(result.DeletedObjectCount)
|
||||
|
||||
if result.DeletedObjectCount > 1 {
|
||||
db.log.Error("object with multiple committed versions were found!",
|
||||
zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName),
|
||||
zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount))
|
||||
|
||||
mon.Meter("multiple_committed_versions").Mark(1)
|
||||
|
||||
return result, Error.New("internal error: multiple committed unversioned objects")
|
||||
}
|
||||
|
||||
if result.DeletedObjectCount > 0 {
|
||||
result.Deleted = append(result.Deleted, deleted)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// queryHighestVersion queries the latest version of an object inside an transaction.
|
||||
//
|
||||
// TODO(ver): this should have a better and clearer name.
|
||||
func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := loc.Verify(); err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = stmt.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(MAX(version), 0) as version
|
||||
FROM objects
|
||||
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return highest, nil
|
||||
}
|
||||
|
||||
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
||||
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
170
satellite/metabase/precommit.go
Normal file
170
satellite/metabase/precommit.go
Normal file
@ -0,0 +1,170 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metabase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/uuid"
|
||||
)
|
||||
|
||||
type deleteObjectUnversionedCommittedResult struct {
|
||||
Deleted []Object
|
||||
// DeletedObjectCount and DeletedSegmentCount return how many elements were deleted.
|
||||
DeletedObjectCount int
|
||||
DeletedSegmentCount int
|
||||
// MaxVersion returns tha highest version that was present in the table.
|
||||
// It returns 0 if there was none.
|
||||
MaxVersion Version
|
||||
}
|
||||
|
||||
type stmtRow interface {
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// deleteObjectUnversionedCommitted deletes the unversioned object at the specified location inside a transaction.
|
||||
//
|
||||
// TODO(ver): this should have a better and clearer name.
|
||||
func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLocation, stmt stmtRow) (result deleteObjectUnversionedCommittedResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := loc.Verify(); err != nil {
|
||||
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
var deleted Object
|
||||
|
||||
// TODO(ver): this scanning can probably simplified somehow.
|
||||
|
||||
var version sql.NullInt64
|
||||
var streamID uuid.NullUUID
|
||||
var createdAt sql.NullTime
|
||||
var segmentCount, fixedSegmentSize sql.NullInt32
|
||||
var totalPlainSize, totalEncryptedSize sql.NullInt64
|
||||
var status sql.NullByte
|
||||
var encryptionParams nullableValue[encryptionParameters]
|
||||
encryptionParams.value.EncryptionParameters = &deleted.Encryption
|
||||
|
||||
err = stmt.QueryRowContext(ctx, `
|
||||
WITH highest_object AS (
|
||||
SELECT MAX(version) as version
|
||||
FROM objects
|
||||
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
), deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
AND status IN `+statusesUnversioned+`
|
||||
RETURNING
|
||||
version, stream_id,
|
||||
created_at, expires_at,
|
||||
status, segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
SELECT
|
||||
(SELECT version FROM deleted_objects),
|
||||
(SELECT stream_id FROM deleted_objects),
|
||||
(SELECT created_at FROM deleted_objects),
|
||||
(SELECT expires_at FROM deleted_objects),
|
||||
(SELECT status FROM deleted_objects),
|
||||
(SELECT segment_count FROM deleted_objects),
|
||||
(SELECT encrypted_metadata_nonce FROM deleted_objects),
|
||||
(SELECT encrypted_metadata FROM deleted_objects),
|
||||
(SELECT encrypted_metadata_encrypted_key FROM deleted_objects),
|
||||
(SELECT total_plain_size FROM deleted_objects),
|
||||
(SELECT total_encrypted_size FROM deleted_objects),
|
||||
(SELECT fixed_segment_size FROM deleted_objects),
|
||||
(SELECT encryption FROM deleted_objects),
|
||||
(SELECT count(*) FROM deleted_objects),
|
||||
(SELECT count(*) FROM deleted_segments),
|
||||
coalesce((SELECT version FROM highest_object), 0)
|
||||
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).
|
||||
Scan(
|
||||
&version,
|
||||
&streamID,
|
||||
&createdAt,
|
||||
&deleted.ExpiresAt,
|
||||
&status,
|
||||
&segmentCount,
|
||||
&deleted.EncryptedMetadataNonce,
|
||||
&deleted.EncryptedMetadata,
|
||||
&deleted.EncryptedMetadataEncryptedKey,
|
||||
&totalPlainSize,
|
||||
&totalEncryptedSize,
|
||||
&fixedSegmentSize,
|
||||
&encryptionParams,
|
||||
&result.DeletedObjectCount,
|
||||
&result.DeletedSegmentCount,
|
||||
&result.MaxVersion,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
deleted.ProjectID = loc.ProjectID
|
||||
deleted.BucketName = loc.BucketName
|
||||
deleted.ObjectKey = loc.ObjectKey
|
||||
deleted.Version = Version(version.Int64)
|
||||
|
||||
deleted.Status = ObjectStatus(status.Byte)
|
||||
deleted.StreamID = streamID.UUID
|
||||
deleted.CreatedAt = createdAt.Time
|
||||
deleted.SegmentCount = segmentCount.Int32
|
||||
|
||||
deleted.TotalPlainSize = totalPlainSize.Int64
|
||||
deleted.TotalEncryptedSize = totalEncryptedSize.Int64
|
||||
deleted.FixedSegmentSize = fixedSegmentSize.Int32
|
||||
|
||||
// TODO: this should happen outside of this func
|
||||
mon.Meter("object_delete").Mark(result.DeletedObjectCount)
|
||||
mon.Meter("segment_delete").Mark(result.DeletedObjectCount)
|
||||
|
||||
if result.DeletedObjectCount > 1 {
|
||||
db.log.Error("object with multiple committed versions were found!",
|
||||
zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName),
|
||||
zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount))
|
||||
|
||||
mon.Meter("multiple_committed_versions").Mark(1)
|
||||
|
||||
return result, Error.New("internal error: multiple committed unversioned objects")
|
||||
}
|
||||
|
||||
if result.DeletedObjectCount > 0 {
|
||||
result.Deleted = append(result.Deleted, deleted)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// queryHighestVersion queries the latest version of an object inside an transaction.
|
||||
//
|
||||
// TODO(ver): this should have a better and clearer name.
|
||||
func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := loc.Verify(); err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = stmt.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(MAX(version), 0) as version
|
||||
FROM objects
|
||||
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return highest, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user