satellite/metabase: adjust ListSegments to return copied segments correctly

Fixes https://github.com/storj/storj/issues/4479

Change-Id: I5a5f0378d14b81c819b5e76bf3b6e3540a41e861
This commit is contained in:
Michał Niewrzał 2022-03-01 13:08:09 +01:00
parent 75be1c0a28
commit 1ef66bf872
4 changed files with 92 additions and 1 deletions

View File

@ -163,6 +163,10 @@ func (cache *NodeAliasCache) ConvertPiecesToAliases(ctx context.Context, pieces
func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) { func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if len(aliasPieces) == 0 {
return Pieces{}, nil
}
latest := cache.getLatest() latest := cache.getLatest()
pieces := make(Pieces, len(aliasPieces)) pieces := make(Pieces, len(aliasPieces))

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql" "storj.io/private/tagsql"
) )
@ -53,7 +54,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
WHERE WHERE
stream_id = $1 AND stream_id = $1 AND
($2 = 0::INT8 OR position > $2) ($2 = 0::INT8 OR position > $2)
ORDER BY position ASC ORDER BY stream_id, position ASC
LIMIT $3 LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error { `, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() { for rows.Next() {
@ -89,6 +90,58 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err) return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
} }
if db.config.ServerSideCopy {
copies := make([]Segment, 0, len(result.Segments))
copiesPositions := make([]int64, 0, len(result.Segments))
for _, segment := range result.Segments {
if segment.PiecesInAncestorSegment() {
copies = append(copies, segment)
copiesPositions = append(copiesPositions, int64(segment.Position.Encode()))
}
}
if len(copies) > 0 {
index := 0
err = withRows(db.db.QueryContext(ctx, `
SELECT
root_piece_id,
remote_alias_pieces
FROM segments
WHERE
stream_id = (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position IN (SELECT position FROM UNNEST($2::INT8[]) as position)
ORDER BY stream_id, position ASC
`, opts.StreamID, pgutil.Int8Array(copiesPositions)))(func(rows tagsql.Rows) error {
for rows.Next() {
var aliasPieces AliasPieces
err = rows.Scan(
&copies[index].RootPieceID,
&aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
copies[index].Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}
index++
}
return nil
})
if err != nil {
return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
}
if index != len(copies) {
return ListSegmentsResult{}, Error.New("number of ancestor segments is different than copies: want %d got %d",
len(copies), index)
}
}
}
if len(result.Segments) > opts.Limit { if len(result.Segments) > opts.Limit {
result.More = true result.More = true
result.Segments = result.Segments[:len(result.Segments)-1] result.Segments = result.Segments[:len(result.Segments)-1]

View File

@ -269,6 +269,36 @@ func TestListSegments(t *testing.T) {
}.Check(ctx, t, db) }.Check(ctx, t, db)
} }
}) })
t.Run("segments from copy", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
for _, numberOfSegments := range []byte{0, 1, 2, 10} {
originalObjectStream := metabasetest.RandObjectStream()
originalObject, _ := metabasetest.CreateTestObject{}.
Run(ctx, t, db, originalObjectStream, numberOfSegments)
copyStream := metabasetest.RandObjectStream()
_, copySegments := metabasetest.CreateObjectCopy{
OriginalObject: originalObject,
CopyObjectStream: &copyStream,
}.Run(ctx, t, db)
expectedSegments := []metabase.Segment{}
for _, segment := range copySegments {
expectedSegments = append(expectedSegments, metabase.Segment(segment))
}
metabasetest.ListSegments{
Opts: metabase.ListSegments{
StreamID: copyStream.StreamID,
},
Result: metabase.ListSegmentsResult{
Segments: expectedSegments,
},
}.Check(ctx, t, db)
}
})
}) })
} }

View File

@ -277,6 +277,10 @@ func (step ListSegments) Check(ctx *testcontext.Context, t testing.TB, db *metab
result, err := db.ListSegments(ctx, step.Opts) result, err := db.ListSegments(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText) checkError(t, err, step.ErrClass, step.ErrText)
if len(step.Result.Segments) == 0 && len(result.Segments) == 0 {
return
}
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
require.Zero(t, diff) require.Zero(t, diff)
} }