satellite/metabase: IterateLoopSegments accepts ranges

Fixes: https://github.com/storj/storj/issues/5207

Change-Id: I7872696068320987825de2d381f57ea503736e89
This commit is contained in:
Fadila Khadar 2022-10-06 13:27:08 +02:00 committed by Storj Robot
parent fac638fc7d
commit 35f74b78e0
3 changed files with 241 additions and 8 deletions

View File

@ -5,6 +5,7 @@ package metabase
import (
"context"
"math"
"time"
"github.com/zeebo/errs"
@ -225,6 +226,8 @@ type IterateLoopSegments struct {
BatchSize int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
StartStreamID uuid.UUID
EndStreamID uuid.UUID
}
// Verify verifies segments request fields.
@ -232,6 +235,14 @@ func (opts *IterateLoopSegments) Verify() error {
if opts.BatchSize < 0 {
return ErrInvalidRequest.New("BatchSize is negative")
}
if !opts.EndStreamID.IsZero() {
if opts.EndStreamID.Less(opts.StartStreamID) {
return ErrInvalidRequest.New("EndStreamID is smaller than StartStreamID")
}
if opts.StartStreamID == opts.EndStreamID {
return ErrInvalidRequest.New("StartStreamID and EndStreamID must be different")
}
}
return nil
}
@ -251,7 +262,21 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
batchSize: opts.BatchSize,
curIndex: 0,
cursor: loopSegmentIteratorCursor{},
cursor: loopSegmentIteratorCursor{
StartStreamID: opts.StartStreamID,
EndStreamID: opts.EndStreamID,
},
}
if !opts.StartStreamID.IsZero() {
// uses MaxInt32 instead of MaxUint32 because position is an int8 in db.
it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32}
}
if it.cursor.EndStreamID.IsZero() {
it.cursor.EndStreamID, err = maxUUID()
if err != nil {
return err
}
}
loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
@ -288,8 +313,9 @@ type loopSegmentIterator struct {
}
type loopSegmentIteratorCursor struct {
StreamID uuid.UUID
Position SegmentPosition
StartStreamID uuid.UUID
StartPosition SegmentPosition
EndStreamID uuid.UUID
}
// Next returns true if there was another item and copy it in item.
@ -329,8 +355,8 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
}
it.curIndex++
it.cursor.StreamID = item.StreamID
it.cursor.Position = item.Position
it.cursor.StartStreamID = item.StreamID
it.cursor.StartPosition = item.Position
return true
}
@ -351,11 +377,11 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows,
FROM segments
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
WHERE
(stream_id, position) > ($1, $2)
(stream_id, position) > ($1, $2) AND stream_id <= $4
ORDER BY (stream_id, position) ASC
LIMIT $3
`, it.cursor.StreamID, it.cursor.Position,
it.batchSize,
`, it.cursor.StartStreamID, it.cursor.StartPosition.Encode(),
it.batchSize, it.cursor.EndStreamID,
)
}
@ -383,3 +409,8 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn
return nil
}
func maxUUID() (uuid.UUID, error) {
maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
return maxUUID, err
}

View File

@ -4,6 +4,7 @@
package metabase_test
import (
"sort"
"strings"
"testing"
"time"
@ -335,6 +336,46 @@ func TestIterateLoopSegments(t *testing.T) {
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("Wrongly defined ranges", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
startStreamID, err := uuid.New()
require.NoError(t, err)
endStreamID, err := uuid.New()
require.NoError(t, err)
if startStreamID.Less(endStreamID) {
startStreamID, endStreamID = endStreamID, startStreamID
}
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
EndStreamID: endStreamID,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "EndStreamID is smaller than StartStreamID",
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
EndStreamID: startStreamID,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StartStreamID and EndStreamID must be different",
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("no segments", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
@ -360,6 +401,34 @@ func TestIterateLoopSegments(t *testing.T) {
Result: nil,
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 10,
AsOfSystemTime: time.Now(),
},
Result: nil,
}.Check(ctx, t, db)
startStreamID, err := uuid.New()
require.NoError(t, err)
endStreamID, err := uuid.New()
require.NoError(t, err)
if endStreamID.Less(startStreamID) {
startStreamID, endStreamID = endStreamID, startStreamID
}
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 10,
AsOfSystemTime: time.Now(),
StartStreamID: startStreamID,
EndStreamID: endStreamID,
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
@ -505,6 +574,136 @@ func TestIterateLoopSegments(t *testing.T) {
Segments: expectedRaw,
}.Check(ctx, t, db)
})
t.Run("streamID range", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfObjects := 10
numberOfSegmentsPerObject := 3
expected := make([]metabase.LoopSegmentEntry, numberOfObjects*numberOfSegmentsPerObject)
expectedRaw := make([]metabase.RawSegment, numberOfObjects*numberOfSegmentsPerObject)
expectedObjects := make([]metabase.RawObject, numberOfObjects)
for i := 0; i < numberOfObjects; i++ {
committed := metabasetest.RandObjectStream()
expectedObjects[i] = metabase.RawObject(
metabasetest.CreateObject(ctx, t, db, committed, byte(numberOfSegmentsPerObject)))
for j := 0; j < numberOfSegmentsPerObject; j++ {
entry := metabase.LoopSegmentEntry{
StreamID: committed.StreamID,
Position: metabase.SegmentPosition{0, uint32(j)},
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
CreatedAt: now,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: int64(j) * 512,
Redundancy: metabasetest.DefaultRedundancy,
}
expected[i*numberOfSegmentsPerObject+j] = entry
expectedRaw[i*numberOfSegmentsPerObject+j] = metabase.RawSegment{
StreamID: entry.StreamID,
Position: entry.Position,
RootPieceID: entry.RootPieceID,
Pieces: entry.Pieces,
CreatedAt: entry.CreatedAt,
EncryptedSize: entry.EncryptedSize,
PlainSize: entry.PlainSize,
PlainOffset: entry.PlainOffset,
Redundancy: entry.Redundancy,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
}
}
}
sort.Slice(expected, func(i, j int) bool {
if expected[i].StreamID.Less(expected[j].StreamID) {
return true
}
if expected[i].StreamID == expected[j].StreamID {
return expected[i].Position.Less(expected[j].Position)
}
return false
})
sort.Slice(expectedObjects, func(i, j int) bool {
return expectedObjects[i].StreamID.Less(expectedObjects[j].StreamID)
})
{ // StartStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: expectedObjects[0].StreamID,
},
Result: expected[numberOfSegmentsPerObject:],
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: expectedObjects[0].StreamID,
BatchSize: 1,
},
Result: expected[numberOfSegmentsPerObject:],
}.Check(ctx, t, db)
}
{ // EndStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
EndStreamID: expectedObjects[3].StreamID,
},
Result: expected[:4*numberOfSegmentsPerObject],
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
EndStreamID: expectedObjects[3].StreamID,
},
Result: expected[:4*numberOfSegmentsPerObject],
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
EndStreamID: expectedObjects[numberOfObjects-1].StreamID,
},
Result: expected,
}.Check(ctx, t, db)
}
{ // StartStreamID and EndStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
AsOfSystemTime: time.Now(),
StartStreamID: expectedObjects[0].StreamID,
EndStreamID: expectedObjects[5].StreamID,
},
Result: expected[numberOfSegmentsPerObject : 6*numberOfSegmentsPerObject],
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
AsOfSystemTime: time.Now(),
StartStreamID: expectedObjects[0].StreamID,
EndStreamID: expectedObjects[5].StreamID,
},
Result: expected[numberOfSegmentsPerObject : 6*numberOfSegmentsPerObject],
}.Check(ctx, t, db)
}
metabasetest.Verify{
Objects: expectedObjects,
Segments: expectedRaw,
}.Check(ctx, t, db)
})
})
}

View File

@ -399,6 +399,9 @@ func (step IterateLoopSegments) Check(ctx *testcontext.Context, t testing.TB, db
}
sort.Slice(step.Result, func(i, j int) bool {
if step.Result[i].StreamID == step.Result[j].StreamID {
return step.Result[i].Position.Less(step.Result[j].Position)
}
return bytes.Compare(step.Result[i].StreamID[:], step.Result[j].StreamID[:]) < 0
})
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())