satellite/metabase: deduplicate delete query
Use the same query when deleting a single object or multiple. I have chosen not to deduplicate the row "scan" logic because it is less complicated code and this change would expand to other parts of the codebase. Part of https://github.com/storj/storj/issues/4700 Change-Id: I7a958c78c903b2bddd72ca217971f7e8e02a0d0c
This commit is contained in:
parent
de6852510f
commit
16480c97da
@ -6,6 +6,7 @@ package metabase
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@ -137,58 +138,110 @@ SELECT
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
|
||||
|
||||
var deleteObjectExactVersionWithCopyFeatureSQL = `
|
||||
// TODO: remove comments with regex.
|
||||
var deleteBucketObjectsWithCopyFeatureSQL = `
|
||||
WITH deleted_objects AS (
|
||||
%s
|
||||
RETURNING
|
||||
stream_id
|
||||
-- extra properties only returned when deleting single object
|
||||
%s
|
||||
),
|
||||
deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING
|
||||
segments.stream_id,
|
||||
segments.position,
|
||||
segments.inline_data,
|
||||
segments.plain_size,
|
||||
segments.encrypted_size,
|
||||
segments.repaired_at,
|
||||
segments.root_piece_id,
|
||||
segments.remote_alias_pieces
|
||||
),
|
||||
deleted_copies AS (
|
||||
DELETE FROM segment_copies
|
||||
WHERE segment_copies.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segment_copies.stream_id
|
||||
),
|
||||
-- lowest stream_id becomes new ancestor
|
||||
promoted_ancestors AS (
|
||||
SELECT
|
||||
min(segment_copies.stream_id::text)::bytea AS new_ancestor_stream_id,
|
||||
segment_copies.ancestor_stream_id AS deleted_stream_id
|
||||
FROM segment_copies
|
||||
-- select children about to lose their ancestor
|
||||
WHERE segment_copies.ancestor_stream_id IN (
|
||||
SELECT stream_id
|
||||
FROM deleted_objects
|
||||
ORDER BY stream_id
|
||||
)
|
||||
-- don't select children which will be removed themselves
|
||||
AND segment_copies.stream_id NOT IN (
|
||||
SELECT stream_id
|
||||
FROM deleted_objects
|
||||
)
|
||||
-- select only one child to promote per ancestor
|
||||
GROUP BY segment_copies.ancestor_stream_id
|
||||
)
|
||||
SELECT
|
||||
deleted_objects.stream_id,
|
||||
deleted_segments.position,
|
||||
deleted_segments.root_piece_id,
|
||||
-- piece to remove from storagenodes or link to new ancestor
|
||||
deleted_segments.remote_alias_pieces,
|
||||
-- if set, caller needs to promote this stream_id to new ancestor or else object contents will be lost
|
||||
promoted_ancestors.new_ancestor_stream_id
|
||||
-- extra properties only returned when deleting single object
|
||||
%s
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments
|
||||
ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
LEFT JOIN promoted_ancestors
|
||||
ON deleted_objects.stream_id = promoted_ancestors.deleted_stream_id
|
||||
ORDER BY stream_id
|
||||
`
|
||||
|
||||
var deleteObjectExactVersionSubSQL = `
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4
|
||||
RETURNING
|
||||
version, stream_id,
|
||||
created_at, expires_at,
|
||||
status, segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING
|
||||
segments.stream_id,
|
||||
segments.position,
|
||||
segments.root_piece_id,
|
||||
segments.remote_alias_pieces,
|
||||
segments.repaired_at
|
||||
), new_ancestor AS (
|
||||
SELECT stream_id, ancestor_stream_id FROM segment_copies
|
||||
WHERE ancestor_stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
ORDER BY stream_id
|
||||
LIMIT 1
|
||||
), delete_if_copy AS (
|
||||
-- TODO we should try add the condition to delete the new ancestor to reduce number of queries.
|
||||
DELETE FROM segment_copies
|
||||
WHERE
|
||||
stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING false
|
||||
)
|
||||
SELECT
|
||||
deleted_objects.version, deleted_objects.stream_id,
|
||||
deleted_objects.created_at, deleted_objects.expires_at,
|
||||
deleted_objects.status, deleted_objects.segment_count,
|
||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.position, deleted_segments.root_piece_id,
|
||||
deleted_segments.remote_alias_pieces, deleted_segments.repaired_at,
|
||||
new_ancestor.stream_id
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
LEFT JOIN new_ancestor ON deleted_objects.stream_id = new_ancestor.ancestor_stream_id
|
||||
ORDER BY deleted_objects.stream_id, deleted_segments.position
|
||||
`
|
||||
|
||||
var deleteObjectExactVersionWithCopyFeatureSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectExactVersionSubSQL,
|
||||
`,version,
|
||||
created_at,
|
||||
expires_at,
|
||||
status,
|
||||
segment_count,
|
||||
encrypted_metadata_nonce,
|
||||
encrypted_metadata,
|
||||
encrypted_metadata_encrypted_key,
|
||||
total_plain_size,
|
||||
total_encrypted_size,
|
||||
fixed_segment_size,
|
||||
encryption`,
|
||||
`,deleted_objects.version,
|
||||
deleted_objects.created_at,
|
||||
deleted_objects.expires_at,
|
||||
deleted_objects.status,
|
||||
deleted_objects.segment_count,
|
||||
deleted_objects.encrypted_metadata_nonce,
|
||||
deleted_objects.encrypted_metadata,
|
||||
deleted_objects.encrypted_metadata_encrypted_key,
|
||||
deleted_objects.total_plain_size,
|
||||
deleted_objects.total_encrypted_size,
|
||||
deleted_objects.fixed_segment_size,
|
||||
deleted_objects.encryption,
|
||||
deleted_segments.repaired_at`,
|
||||
)
|
||||
|
||||
var deleteFromSegmentCopies = `
|
||||
DELETE FROM segment_copies WHERE segment_copies.stream_id = $1
|
||||
`
|
||||
@ -586,14 +639,21 @@ func (db *DB) scanObjectDeletionServerSideCopy(ctx context.Context, location Obj
|
||||
object.BucketName = location.BucketName
|
||||
object.ObjectKey = location.ObjectKey
|
||||
|
||||
err = rows.Scan(&object.Version, &object.StreamID,
|
||||
err = rows.Scan(
|
||||
// shared properties between deleteObject and deleteBucketObjects functionality
|
||||
&object.StreamID,
|
||||
&segmentPosition,
|
||||
&rootPieceID,
|
||||
&aliasPieces,
|
||||
&object.PromotedAncestor,
|
||||
// properties only for deleteObject functionality
|
||||
&object.Version,
|
||||
&object.CreatedAt, &object.ExpiresAt,
|
||||
&object.Status, &object.SegmentCount,
|
||||
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
|
||||
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
|
||||
encryptionParameters{&object.Encryption},
|
||||
&segmentPosition, &rootPieceID, &aliasPieces, &segment.RepairedAt,
|
||||
&object.PromotedAncestor,
|
||||
&segment.RepairedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, Error.New("unable to delete object: %w", err)
|
||||
|
@ -36,7 +36,6 @@ var deleteObjectsCockroachSubSQL = `
|
||||
DELETE FROM objects
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
RETURNING objects.stream_id
|
||||
`
|
||||
|
||||
// postgres does not support LIMIT in DELETE.
|
||||
@ -46,76 +45,17 @@ WHERE (objects.project_id, objects.bucket_name) IN (
|
||||
SELECT project_id, bucket_name FROM objects
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
)
|
||||
RETURNING objects.stream_id`
|
||||
|
||||
// TODO: remove comments with regex.
|
||||
// TODO: align/merge with metabase/delete.go.
|
||||
var deleteBucketObjectsWithCopyFeatureSQL = `
|
||||
WITH deleted_objects AS (
|
||||
%s
|
||||
),
|
||||
deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING
|
||||
segments.stream_id,
|
||||
segments.position,
|
||||
segments.inline_data,
|
||||
segments.plain_size,
|
||||
segments.encrypted_size,
|
||||
segments.repaired_at,
|
||||
segments.root_piece_id,
|
||||
segments.remote_alias_pieces
|
||||
),
|
||||
deleted_copies AS (
|
||||
DELETE FROM segment_copies
|
||||
WHERE segment_copies.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segment_copies.stream_id
|
||||
),
|
||||
-- lowest stream_id becomes new ancestor
|
||||
promoted_ancestors AS (
|
||||
SELECT
|
||||
min(segment_copies.stream_id::text)::bytea AS new_ancestor_stream_id,
|
||||
segment_copies.ancestor_stream_id AS deleted_stream_id
|
||||
FROM segment_copies
|
||||
-- select children about to lose their ancestor
|
||||
WHERE segment_copies.ancestor_stream_id IN (
|
||||
SELECT stream_id
|
||||
FROM deleted_objects
|
||||
ORDER BY stream_id
|
||||
)
|
||||
-- don't select children which will be removed themselves
|
||||
AND segment_copies.stream_id NOT IN (
|
||||
SELECT stream_id
|
||||
FROM deleted_objects
|
||||
)
|
||||
-- select only one child to promote per ancestor
|
||||
GROUP BY segment_copies.ancestor_stream_id
|
||||
)
|
||||
SELECT
|
||||
deleted_objects.stream_id,
|
||||
deleted_segments.position,
|
||||
deleted_segments.root_piece_id,
|
||||
-- piece to remove from storagenodes or link to new ancestor
|
||||
deleted_segments.remote_alias_pieces,
|
||||
-- if set, caller needs to promote this stream_id to new ancestor or else object contents will be lost
|
||||
promoted_ancestors.new_ancestor_stream_id
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments
|
||||
ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
LEFT JOIN promoted_ancestors
|
||||
ON deleted_objects.stream_id = promoted_ancestors.deleted_stream_id
|
||||
ORDER BY stream_id
|
||||
`
|
||||
)`
|
||||
|
||||
var deleteBucketObjectsWithCopyFeaturePostgresSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectsPostgresSubSQL,
|
||||
"", "",
|
||||
)
|
||||
var deleteBucketObjectsWithCopyFeatureCockroachSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectsCockroachSubSQL,
|
||||
"", "",
|
||||
)
|
||||
|
||||
func getDeleteBucketObjectsSQLWithCopyFeature(impl dbutil.Implementation) (string, error) {
|
||||
@ -217,7 +157,7 @@ func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, locat
|
||||
var object deletedObjectInfo
|
||||
var segment deletedRemoteSegmentInfo
|
||||
var aliasPieces AliasPieces
|
||||
var position *SegmentPosition
|
||||
var segmentPosition *SegmentPosition
|
||||
|
||||
for rows.Next() {
|
||||
object.ProjectID = location.ProjectID
|
||||
@ -225,7 +165,7 @@ func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, locat
|
||||
|
||||
err = rows.Scan(
|
||||
&object.StreamID,
|
||||
&position,
|
||||
&segmentPosition,
|
||||
&rootPieceID,
|
||||
&aliasPieces,
|
||||
&object.PromotedAncestor,
|
||||
@ -238,7 +178,7 @@ func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, locat
|
||||
result = append(result, object)
|
||||
}
|
||||
if rootPieceID != nil {
|
||||
segment.Position = *position
|
||||
segment.Position = *segmentPosition
|
||||
segment.RootPieceID = *rootPieceID
|
||||
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user