satellite/metainfo/metabase: add stream range for listing segments

Change-Id: I32833e805a1046b9752b04888f830b51809a1efd
This commit is contained in:
Egon Elbre 2021-03-31 14:08:22 +03:00
parent c334fd090e
commit 5c038c4325
2 changed files with 189 additions and 12 deletions

View File

@ -104,6 +104,14 @@ type ListStreamPositions struct {
StreamID uuid.UUID StreamID uuid.UUID
Cursor SegmentPosition Cursor SegmentPosition
Limit int Limit int
Range *StreamRange
}
// StreamRange allows to limit stream positions based on the plain offsets.
type StreamRange struct {
PlainStart int64
PlainLimit int64 // limit is exclusive
} }
// ListStreamPositionsResult result of listing segments. // ListStreamPositionsResult result of listing segments.
@ -134,22 +142,46 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
if opts.Limit < 0 { if opts.Limit < 0 {
return ListStreamPositionsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit) return ListStreamPositionsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
} }
if opts.Limit == 0 || opts.Limit > MaxListLimit { if opts.Limit == 0 || opts.Limit > MaxListLimit {
opts.Limit = MaxListLimit opts.Limit = MaxListLimit
} }
err = withRows(db.db.Query(ctx, ` if opts.Range != nil {
SELECT if opts.Range.PlainStart > opts.Range.PlainLimit {
position, plain_size, plain_offset, created_at, return ListStreamPositionsResult{}, ErrInvalidRequest.New("invalid range: %d:%d", opts.Range.PlainStart, opts.Range.PlainLimit)
encrypted_etag, encrypted_key_nonce, encrypted_key }
FROM segments }
WHERE
stream_id = $1 AND var rows tagsql.Rows
($2 = 0::INT8 OR position > $2) var rowsErr error
ORDER BY position ASC if opts.Range == nil {
LIMIT $3 rows, rowsErr = db.db.Query(ctx, `
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error { SELECT
position, plain_size, plain_offset, created_at,
encrypted_etag, encrypted_key_nonce, encrypted_key
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1)
} else {
rows, rowsErr = db.db.Query(ctx, `
SELECT
position, plain_size, plain_offset, created_at,
encrypted_etag, encrypted_key_nonce, encrypted_key
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2) AND
$4 < plain_offset + plain_size AND plain_offset < $5
ORDER BY position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1, opts.Range.PlainStart, opts.Range.PlainLimit)
}
err = withRows(rows, rowsErr)(func(rows tagsql.Rows) error {
for rows.Next() { for rows.Next() {
var segment SegmentPositionInfo var segment SegmentPositionInfo
err = rows.Scan( err = rows.Scan(

View File

@ -540,5 +540,150 @@ func TestListStreamPositions(t *testing.T) {
}.Check(ctx, t, db) }.Check(ctx, t, db)
} }
}) })
t.Run("range", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
const segmentCount = 10
const segmentSize = 512
expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: segmentSize,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: defaultTestRedundancy,
}
obj := randObjectStream()
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: defaultTestEncryption,
},
Version: obj.Version,
}.Check(ctx, t, db)
for i := 0; i < segmentCount; i++ {
segmentPosition := metabase.SegmentPosition{
Part: uint32(i / 2),
Index: uint32(i % 2),
}
BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
Position: segmentPosition,
RootPieceID: storj.PieceID{byte(i + 1)},
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
},
}.Check(ctx, t, db)
CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: segmentPosition,
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: segmentSize,
PlainOffset: 0,
Redundancy: defaultTestRedundancy,
},
}.Check(ctx, t, db)
}
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
},
}.Check(ctx, t, db)
expectedSegments := make([]metabase.SegmentPositionInfo, segmentCount)
expectedOffset := int64(0)
for i := range expectedSegments {
segmentPosition := metabase.SegmentPosition{
Part: uint32(i / 2),
Index: uint32(i % 2),
}
expectedSegments[i] = metabase.SegmentPositionInfo{
Position: segmentPosition,
PlainSize: expectedSegment.PlainSize,
PlainOffset: expectedOffset,
CreatedAt: &now,
EncryptedKey: expectedSegment.EncryptedKey,
EncryptedKeyNonce: expectedSegment.EncryptedKeyNonce,
EncryptedETag: expectedSegment.EncryptedETag,
}
expectedOffset += int64(expectedSegment.PlainSize)
}
ListStreamPositions{
Opts: metabase.ListStreamPositions{
StreamID: obj.StreamID,
Range: &metabase.StreamRange{
PlainStart: 5,
PlainLimit: 4,
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "invalid range: 5:4",
}.Check(ctx, t, db)
type rangeTest struct {
limit int
plainStart int64
plainLimit int64
results []metabase.SegmentPositionInfo
more bool
}
totalSize := int64(segmentCount * 512)
var tests = []rangeTest{
{plainStart: 0, plainLimit: 0},
{plainStart: totalSize, plainLimit: totalSize},
{plainStart: 0, plainLimit: totalSize, results: expectedSegments},
{plainStart: 0, plainLimit: totalSize - (segmentSize - 1), results: expectedSegments},
{plainStart: 0, plainLimit: totalSize - segmentSize, results: expectedSegments[:segmentCount-1]},
{plainStart: 0, plainLimit: segmentSize, results: expectedSegments[:1]},
{plainStart: 0, plainLimit: segmentSize + 1, results: expectedSegments[:2]},
{plainStart: segmentSize, plainLimit: totalSize, results: expectedSegments[1:]},
{plainStart: segmentSize / 2, plainLimit: segmentSize + segmentSize/2, results: expectedSegments[0:2]},
{plainStart: segmentSize - 1, plainLimit: segmentSize + segmentSize/2, results: expectedSegments[0:2]},
{plainStart: segmentSize, plainLimit: segmentSize + segmentSize/2, results: expectedSegments[1:2]},
{plainStart: segmentSize + 1, plainLimit: segmentSize + segmentSize/2, results: expectedSegments[1:2]},
{limit: 2, plainStart: segmentSize, plainLimit: totalSize, results: expectedSegments[1:3], more: true},
}
for _, test := range tests {
ListStreamPositions{
Opts: metabase.ListStreamPositions{
StreamID: obj.StreamID,
Limit: test.limit,
Range: &metabase.StreamRange{
PlainStart: test.plainStart,
PlainLimit: test.plainLimit,
},
},
Result: metabase.ListStreamPositionsResult{
Segments: test.results,
More: test.more,
},
}.Check(ctx, t, db)
}
})
}) })
} }