satellite/metabase: simplify delete query

when deleting an object that has been copied multiple times, we look for an ancestor_stream_id by taking the min of all copies stream_id.

This change simplifies this process by picking any stream_id as a new ancestor by using 'distinct on'.

Fixes https://github.com/storj/storj/issues/4745

Change-Id: Iffb519b82d2ae2ed73af48fa0e86f87384e0158f
This commit is contained in:
Fadila Khadar 2022-05-17 10:53:45 +02:00 committed by Storj Robot
parent b379fe89d2
commit 792c49e5ad
2 changed files with 32 additions and 10 deletions

View File

@ -166,8 +166,9 @@ deleted_copies AS (
),
-- lowest stream_id becomes new ancestor
promoted_ancestors AS (
SELECT
min(segment_copies.stream_id::text)::bytea AS new_ancestor_stream_id,
-- select only one child to promote per ancestor
SELECT DISTINCT ON (segment_copies.ancestor_stream_id)
segment_copies.stream_id AS new_ancestor_stream_id,
segment_copies.ancestor_stream_id AS deleted_stream_id
FROM segment_copies
-- select children about to lose their ancestor
@ -181,8 +182,6 @@ promoted_ancestors AS (
SELECT stream_id
FROM deleted_objects
)
-- select only one child to promote per ancestor
GROUP BY segment_copies.ancestor_stream_id
)
SELECT
deleted_objects.stream_id,

View File

@ -1131,20 +1131,43 @@ func TestDeleteCopy(t *testing.T) {
})
require.NoError(t, err)
remainingStreamIDs := []uuid.UUID{copyObject1.StreamID, copyObject2.StreamID}
uuid.SortAscending(remainingStreamIDs)
var expectedAncestorStreamID uuid.UUID
var expectedCopyStreamID uuid.UUID
var copies []metabase.RawCopy
if numberOfSegments > 0 {
segments, err := db.TestingAllSegments(ctx)
require.NoError(t, err)
require.NotEmpty(t, segments)
if segments[0].PiecesInAncestorSegment() {
if segments[0].StreamID == copyObject1.StreamID {
expectedCopyStreamID = copyObject1.StreamID
expectedAncestorStreamID = copyObject2.StreamID
} else {
expectedCopyStreamID = copyObject2.StreamID
expectedAncestorStreamID = copyObject1.StreamID
}
} else {
if segments[0].StreamID == copyObject1.StreamID {
expectedCopyStreamID = copyObject2.StreamID
expectedAncestorStreamID = copyObject1.StreamID
} else {
expectedCopyStreamID = copyObject1.StreamID
expectedAncestorStreamID = copyObject2.StreamID
}
}
copies = []metabase.RawCopy{
{
StreamID: remainingStreamIDs[1],
AncestorStreamID: remainingStreamIDs[0],
StreamID: expectedCopyStreamID,
AncestorStreamID: expectedAncestorStreamID,
}}
}
expectedAncestorStreamID := remainingStreamIDs[0]
// set pieces in expected ancestor for verifcation
for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} {
for i := range segments {