satellite/metadabase/rangedloop: stream affinity for test provider

Some observers assume that they will observe all the segments for a
given stream, and that they will observe those segments in a sequential
stream over one or more iterations.

This change updates the range provider from rangedlooptest to provide
these guarantees.

The change also removes the Mock suffix from the provider/splitter types
since the package name (rangedlooptest) implies that the type is a test
double.

Change-Id: I927c409807e305787abcde57427baac22f663eaa
This commit is contained in:
Andrew Harding 2022-12-09 08:40:23 -07:00 committed by Storj Robot
parent ba7d2c2dbe
commit 633ab8dcf6
5 changed files with 272 additions and 70 deletions

View File

@ -0,0 +1,126 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedlooptest
import (
"context"
"fmt"
"math"
"sort"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var _ rangedloop.RangeSplitter = (*RangeSplitter)(nil)
// RangeSplitter allows to iterate over segments from an in-memory source.
type RangeSplitter struct {
Segments []segmentloop.Segment
}
var _ rangedloop.SegmentProvider = (*SegmentProvider)(nil)
// SegmentProvider allows to iterate over segments from an in-memory source.
type SegmentProvider struct {
Segments []segmentloop.Segment
batchSize int
}
// CreateRanges splits the segments into equal ranges.
func (m *RangeSplitter) CreateRanges(nRanges int, batchSize int) ([]rangedloop.SegmentProvider, error) {
// The segments for a given stream must be handled by a single segment
// provider. Split the segments into streams.
streams := streamsFromSegments(m.Segments)
// Break up the streams into ranges
rangeSize := int(math.Ceil(float64(len(streams)) / float64(nRanges)))
rangeProviders := []rangedloop.SegmentProvider{}
for i := 0; i < nRanges; i++ {
offset := min(i*rangeSize, len(streams))
end := min(offset+rangeSize, len(streams))
rangeProviders = append(rangeProviders, &SegmentProvider{
Segments: segmentsFromStreams(streams[offset:end]),
batchSize: batchSize,
})
}
return rangeProviders, nil
}
// Iterate allows to loop over the segments stored in the provider.
func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
for offset := 0; offset < len(m.Segments); offset += m.batchSize {
end := min(offset+m.batchSize, len(m.Segments))
err := fn(m.Segments[offset:end])
if err != nil {
return err
}
}
return nil
}
func min(x, y int) int {
if x < y {
return x
}
return y
}
func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment {
// Duplicate and sort the segments by stream ID
segments = append([]segmentloop.Segment(nil), segments...)
for i, segment := range segments {
fmt.Println("BEFORE:", i, segment.StreamID, segment.Position)
}
sort.Slice(segments, func(i int, j int) bool {
idcmp := segments[i].StreamID.Compare(segments[j].StreamID)
switch {
case idcmp < 0:
return true
case idcmp > 0:
return false
default:
return segments[i].Position.Less(segments[j].Position)
}
})
for i, segment := range segments {
fmt.Println("AFTER:", i, segment.StreamID, segment.Position)
}
// Break up the sorted segments into streams
var streams [][]segmentloop.Segment
var stream []segmentloop.Segment
for _, segment := range segments {
if len(stream) > 0 && stream[0].StreamID != segment.StreamID {
// Stream ID changed; push and reset stream
streams = append(streams, stream)
stream = nil
}
stream = append(stream, segment)
}
// Append the last stream (will be empty if there were no segments)
if len(stream) > 0 {
streams = append(streams, stream)
}
for i, stream := range streams {
for j, segment := range stream {
fmt.Println("STREAM:", i, j, segment.StreamID, segment.Position)
}
}
return streams
}
func segmentsFromStreams(streams [][]segmentloop.Segment) []segmentloop.Segment {
var segments []segmentloop.Segment
for _, stream := range streams {
segments = append(segments, stream...)
}
return segments
}

View File

@ -0,0 +1,144 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedlooptest
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
var (
r = rand.New(rand.NewSource(time.Now().Unix()))
)
func TestSplitter(t *testing.T) {
mkseg := func(streamID byte, pos uint64) segmentloop.Segment {
return segmentloop.Segment{
StreamID: uuid.UUID{0: streamID},
Position: metabase.SegmentPositionFromEncoded(pos),
}
}
mkstream := func(streamID byte, numSegments int) []segmentloop.Segment {
var stream []segmentloop.Segment
for i := 0; i < numSegments; i++ {
stream = append(stream, mkseg(streamID, uint64(numSegments)))
}
return stream
}
intermix := func(segments []segmentloop.Segment) []segmentloop.Segment {
segments = append([]segmentloop.Segment(nil), segments...)
r.Shuffle(len(segments), func(i, j int) {
segments[i], segments[j] = segments[j], segments[i]
})
return segments
}
combine := func(streams ...[]segmentloop.Segment) []segmentloop.Segment {
return segmentsFromStreams(streams)
}
stream1 := mkstream(1, 3)
stream2 := mkstream(2, 5)
stream3 := mkstream(3, 1)
stream4 := mkstream(4, 2)
stream5 := mkstream(5, 4)
for _, tt := range []struct {
desc string
segments []segmentloop.Segment
numRanges int
expectRanges [][]segmentloop.Segment
}{
{
desc: "no segments",
segments: nil,
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
{},
{},
},
},
{
desc: "one stream over two ranges",
segments: stream1,
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
stream1,
{},
},
},
{
desc: "two streams over two ranges",
segments: combine(stream1, stream2),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
stream1,
stream2,
},
},
{
desc: "three streams over two ranges",
segments: combine(stream1, stream2, stream3),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
stream3,
},
},
{
desc: "three streams intermixed over two ranges",
segments: intermix(combine(stream1, stream2, stream3)),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
stream3,
},
},
{
desc: "five streams intermixed over three ranges",
segments: intermix(combine(stream1, stream2, stream3, stream4, stream5)),
numRanges: 3,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
combine(stream3, stream4),
stream5,
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
const batchSize = 3
splitter := RangeSplitter{Segments: tt.segments}
providers, err := splitter.CreateRanges(tt.numRanges, batchSize)
require.NoError(t, err)
var actualRanges [][]segmentloop.Segment
for _, provider := range providers {
rangeSegments := []segmentloop.Segment{}
err := provider.Iterate(context.Background(), func(segments []segmentloop.Segment) error {
if len(segments) > batchSize {
return fmt.Errorf("iterated segments (%d) larger than batch size (%d)", len(segments), batchSize)
}
rangeSegments = append(rangeSegments, segments...)
return nil
})
require.NoError(t, err)
actualRanges = append(actualRanges, rangeSegments)
}
require.Equal(t, tt.expectRanges, actualRanges)
})
}
}

View File

@ -1,68 +0,0 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedlooptest
import (
"context"
"math"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var _ rangedloop.RangeSplitter = (*RangeSplitterMock)(nil)
// RangeSplitterMock allows to iterate over segments from an in-memory source.
type RangeSplitterMock struct {
Segments []segmentloop.Segment
}
var _ rangedloop.SegmentProvider = (*SegmentProviderMock)(nil)
// SegmentProviderMock allows to iterate over segments from an in-memory source.
type SegmentProviderMock struct {
Segments []segmentloop.Segment
batchSize int
}
// CreateRanges splits the segments into equal ranges.
func (m *RangeSplitterMock) CreateRanges(nRanges int, batchSize int) ([]rangedloop.SegmentProvider, error) {
rangeSize := int(math.Ceil(float64(len(m.Segments)) / float64(nRanges)))
rangeProviders := []rangedloop.SegmentProvider{}
for i := 0; i < nRanges; i++ {
offset := min(i*rangeSize, len(m.Segments))
end := min(offset+rangeSize, len(m.Segments))
segments := m.Segments[offset:end]
rangeProviders = append(rangeProviders, &SegmentProviderMock{
Segments: segments,
batchSize: batchSize,
})
}
return rangeProviders, nil
}
// Iterate allows to loop over the segments stored in the provider.
func (m *SegmentProviderMock) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
for offset := 0; offset < len(m.Segments); offset += m.batchSize {
end := min(offset+m.batchSize, len(m.Segments))
err := fn(m.Segments[offset:end])
if err != nil {
return err
}
}
return nil
}
func min(x, y int) int {
if x < y {
return x
}
return y
}

View File

@ -47,7 +47,7 @@ func RunTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
Parallelism: parallelism,
AsOfSystemInterval: 0,
},
&rangedlooptest.RangeSplitterMock{
&rangedlooptest.RangeSplitter{
Segments: make([]segmentloop.Segment, nSegments),
},
observers,

View File

@ -76,7 +76,7 @@ func NewRangedLoop(log *zap.Logger, full *identity.FullIdentity, db DB, metabase
{ // setup ranged loop
// TODO: replace with real segment provider
segments := &rangedlooptest.RangeSplitterMock{}
segments := &rangedlooptest.RangeSplitter{}
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, nil)
peer.Services.Add(lifecycle.Item{