satellite/metabase/rangedloop: move Segment definition
We will remove segments loop soon so we need first to move Segment definition to rangedloop package. https://github.com/storj/storj/issues/5237 Change-Id: Ibe6aad316ffb7073cc4de166f1f17b87aac07363
This commit is contained in:
parent
7e69b22dd4
commit
4bdbb25d83
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -154,7 +153,7 @@ func newObserverFork(log *zap.Logger, nowFn func() time.Time) *observerFork {
|
||||
}
|
||||
|
||||
// Process iterates over segment range updating partial node usage map.
|
||||
func (partial *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (partial *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
now := partial.nowFn()
|
||||
|
||||
for _, segment := range segments {
|
||||
@ -164,7 +163,7 @@ func (partial *observerFork) Process(ctx context.Context, segments []segmentloop
|
||||
return nil
|
||||
}
|
||||
|
||||
func (partial *observerFork) processSegment(now time.Time, segment segmentloop.Segment) {
|
||||
func (partial *observerFork) processSegment(now time.Time, segment rangedloop.Segment) {
|
||||
if segment.Inline() {
|
||||
return
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting/nodetally"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
)
|
||||
|
||||
func TestSingleObjectNodeTallyRangedLoop(t *testing.T) {
|
||||
@ -298,10 +298,10 @@ func BenchmarkProcess(b *testing.B) {
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(b, err)
|
||||
|
||||
loopSegments := []segmentloop.Segment{}
|
||||
loopSegments := []rangedloop.Segment{}
|
||||
|
||||
for _, segment := range segments {
|
||||
loopSegments = append(loopSegments, segmentloop.Segment{
|
||||
loopSegments = append(loopSegments, rangedloop.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
CreatedAt: segment.CreatedAt,
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// Observer populates reservoirs and the audit queue.
|
||||
@ -140,7 +139,7 @@ func newObserverFork(reservoirSlots int, r *rand.Rand) *observerFork {
|
||||
}
|
||||
|
||||
// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
|
||||
for _, segment := range segments {
|
||||
// The reservoir ends up deferencing and copying the segment internally
|
||||
// but that's not obvious, so alias the loop variable.
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// TestAuditCollector does the following:
|
||||
@ -98,10 +97,10 @@ func BenchmarkRemoteSegment(b *testing.B) {
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(b, err)
|
||||
|
||||
loopSegments := []segmentloop.Segment{}
|
||||
loopSegments := []rangedloop.Segment{}
|
||||
|
||||
for _, segment := range segments {
|
||||
loopSegments = append(loopSegments, segmentloop.Segment{
|
||||
loopSegments = append(loopSegments, rangedloop.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
CreatedAt: segment.CreatedAt,
|
||||
|
@ -12,14 +12,14 @@ import (
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
)
|
||||
|
||||
const maxReservoirSize = 3
|
||||
|
||||
// Reservoir holds a certain number of segments to reflect a random sample.
|
||||
type Reservoir struct {
|
||||
segments [maxReservoirSize]segmentloop.Segment
|
||||
segments [maxReservoirSize]rangedloop.Segment
|
||||
keys [maxReservoirSize]float64
|
||||
size int8
|
||||
index int8
|
||||
@ -39,7 +39,7 @@ func NewReservoir(size int) *Reservoir {
|
||||
}
|
||||
|
||||
// Segments returns the segments picked by the reservoir.
|
||||
func (reservoir *Reservoir) Segments() []segmentloop.Segment {
|
||||
func (reservoir *Reservoir) Segments() []rangedloop.Segment {
|
||||
return reservoir.segments[:reservoir.index]
|
||||
}
|
||||
|
||||
@ -55,12 +55,12 @@ func (reservoir *Reservoir) Keys() []float64 {
|
||||
// be passed in. The way this is accomplished is known as _Reservoir Sampling_.
|
||||
// The specific algorithm we are using here is called A-Res on the Wikipedia
|
||||
// article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res
|
||||
func (reservoir *Reservoir) Sample(r *rand.Rand, segment segmentloop.Segment) {
|
||||
func (reservoir *Reservoir) Sample(r *rand.Rand, segment rangedloop.Segment) {
|
||||
k := -math.Log(r.Float64()) / float64(segment.EncryptedSize)
|
||||
reservoir.sample(k, segment)
|
||||
}
|
||||
|
||||
func (reservoir *Reservoir) sample(k float64, segment segmentloop.Segment) {
|
||||
func (reservoir *Reservoir) sample(k float64, segment rangedloop.Segment) {
|
||||
if reservoir.index < reservoir.size {
|
||||
reservoir.segments[reservoir.index] = segment
|
||||
reservoir.keys[reservoir.index] = k
|
||||
@ -99,7 +99,7 @@ type Segment struct {
|
||||
}
|
||||
|
||||
// NewSegment creates a new segment to audit from a metainfo loop segment.
|
||||
func NewSegment(loopSegment segmentloop.Segment) Segment {
|
||||
func NewSegment(loopSegment rangedloop.Segment) Segment {
|
||||
return Segment{
|
||||
StreamID: loopSegment.StreamID,
|
||||
Position: loopSegment.Position,
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
)
|
||||
|
||||
func TestReservoir(t *testing.T) {
|
||||
@ -25,7 +25,7 @@ func TestReservoir(t *testing.T) {
|
||||
|
||||
for size := 0; size < maxReservoirSize; size++ {
|
||||
t.Run(fmt.Sprintf("size %d", size), func(t *testing.T) {
|
||||
samples := []segmentloop.Segment{}
|
||||
samples := []rangedloop.Segment{}
|
||||
for i := 0; i < size; i++ {
|
||||
samples = append(samples, makeSegment(i))
|
||||
}
|
||||
@ -44,7 +44,7 @@ func TestReservoir(t *testing.T) {
|
||||
func TestReservoirMerge(t *testing.T) {
|
||||
t.Run("merge successful", func(t *testing.T) {
|
||||
// Use a fixed rng so we get deterministic sampling results.
|
||||
segments := []segmentloop.Segment{
|
||||
segments := []rangedloop.Segment{
|
||||
makeSegment(0), makeSegment(1), makeSegment(2),
|
||||
makeSegment(3), makeSegment(4), makeSegment(5),
|
||||
}
|
||||
@ -65,7 +65,7 @@ func TestReservoirMerge(t *testing.T) {
|
||||
// Segments should contain a cross section from r1 and r2. If the rng
|
||||
// changes, this result will likely change too since that will affect
|
||||
// the keys. and therefore how they are merged.
|
||||
require.Equal(t, []segmentloop.Segment{
|
||||
require.Equal(t, []rangedloop.Segment{
|
||||
segments[5],
|
||||
segments[1],
|
||||
segments[2],
|
||||
@ -93,7 +93,7 @@ func TestReservoirWeights(t *testing.T) {
|
||||
weight1StreamID: 0,
|
||||
}
|
||||
|
||||
segments := []segmentloop.Segment{
|
||||
segments := []rangedloop.Segment{
|
||||
{
|
||||
StreamID: weight10StreamID,
|
||||
Position: metabase.SegmentPosition{},
|
||||
@ -163,7 +163,7 @@ func TestReservoirBias(t *testing.T) {
|
||||
for r := 0; r < numRounds; r++ {
|
||||
res := NewReservoir(reservoirSize)
|
||||
for n := 0; n < numSegments; n++ {
|
||||
seg := segmentloop.Segment{
|
||||
seg := rangedloop.Segment{
|
||||
EncryptedSize: weight,
|
||||
}
|
||||
binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits))
|
||||
@ -198,8 +198,8 @@ func (us uint64Slice) Len() int { return len(us) }
|
||||
func (us uint64Slice) Swap(i, j int) { us[i], us[j] = us[j], us[i] }
|
||||
func (us uint64Slice) Less(i, j int) bool { return us[i] < us[j] }
|
||||
|
||||
func makeSegment(n int) segmentloop.Segment {
|
||||
return segmentloop.Segment{
|
||||
func makeSegment(n int) rangedloop.Segment {
|
||||
return rangedloop.Segment{
|
||||
StreamID: uuid.UUID{0: byte(n)},
|
||||
EncryptedSize: int32(n * 1000),
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -165,7 +164,7 @@ func newObserverFork(log *zap.Logger, config Config, pieceCounts map[storj.NodeI
|
||||
}
|
||||
|
||||
// Process adds pieces to the bloom filter from remote segments.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
for _, segment := range segments {
|
||||
if segment.Inline() {
|
||||
continue
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -103,7 +102,7 @@ func (obs *SyncObserver) Finish(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// Process adds pieces to the bloom filter from remote segments.
|
||||
func (obs *SyncObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (obs *SyncObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
latestCreationTime := time.Time{}
|
||||
for _, segment := range segments {
|
||||
if segment.Inline() {
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
@ -245,9 +244,9 @@ func TestObserverGarbageCollection_MultipleRanges(t *testing.T) {
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
loopSegments := []segmentloop.Segment{}
|
||||
loopSegments := []rangedloop.Segment{}
|
||||
for _, segment := range segments {
|
||||
loopSegments = append(loopSegments, segmentloop.Segment{
|
||||
loopSegments = append(loopSegments, rangedloop.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
CreatedAt: segment.CreatedAt,
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -192,7 +191,7 @@ func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batc
|
||||
}
|
||||
|
||||
// Process adds transfer queue items for remote segments belonging to newly exiting nodes.
|
||||
func (observer *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
func (observer *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
|
||||
// Intentionally omitting mon.Task here. The duration for all process
|
||||
// calls are aggregated and and emitted by the ranged loop service.
|
||||
|
||||
@ -211,7 +210,7 @@ func (observer *observerFork) Process(ctx context.Context, segments []segmentloo
|
||||
return nil
|
||||
}
|
||||
|
||||
func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment segmentloop.Segment) (err error) {
|
||||
func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment rangedloop.Segment) (err error) {
|
||||
numPieces := len(segment.Pieces)
|
||||
for _, piece := range segment.Pieces {
|
||||
if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok {
|
||||
|
@ -7,9 +7,27 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
// Segment contains information about segment metadata which will be received by observers.
|
||||
type Segment metabase.LoopSegmentEntry
|
||||
|
||||
// Inline returns true if segment is inline.
|
||||
func (s Segment) Inline() bool {
|
||||
return s.Redundancy.IsZero() && len(s.Pieces) == 0
|
||||
}
|
||||
|
||||
// Expired checks if segment expired relative to now.
|
||||
func (s *Segment) Expired(now time.Time) bool {
|
||||
return s.ExpiresAt != nil && s.ExpiresAt.Before(now)
|
||||
}
|
||||
|
||||
// PieceSize returns calculated piece size for segment.
|
||||
func (s Segment) PieceSize() int64 {
|
||||
return s.Redundancy.PieceSize(int64(s.EncryptedSize))
|
||||
}
|
||||
|
||||
// Observer subscribes to the parallel segment loop.
|
||||
// It is intended that a naïve implementation is threadsafe.
|
||||
type Observer interface {
|
||||
@ -33,5 +51,5 @@ type Observer interface {
|
||||
type Partial interface {
|
||||
// Process is called repeatedly with batches of segments.
|
||||
// It is not called concurrently on the same instance.
|
||||
Process(context.Context, []segmentloop.Segment) error
|
||||
Process(context.Context, []Segment) error
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ monkit.StatSource = (*LiveCountObserver)(nil)
|
||||
@ -72,7 +71,7 @@ func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error {
|
||||
}
|
||||
|
||||
// Process increments the counter.
|
||||
func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (o *LiveCountObserver) Process(ctx context.Context, segments []Segment) error {
|
||||
processed := atomic.AddInt64(&o.segmentsProcessed, int64(len(segments)))
|
||||
|
||||
mon.IntVal("segmentsProcessed").Observe(processed)
|
||||
|
@ -5,8 +5,6 @@ package rangedloop
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// RangeSplitter splits a source of segments into ranges,
|
||||
@ -20,5 +18,5 @@ type RangeSplitter interface {
|
||||
// SegmentProvider iterates through a range of segments.
|
||||
type SegmentProvider interface {
|
||||
Range() UUIDRange
|
||||
Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error
|
||||
Iterate(ctx context.Context, fn func([]Segment) error) error
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// MetabaseRangeSplitter implements RangeSplitter.
|
||||
@ -68,7 +67,7 @@ func (provider *MetabaseSegmentProvider) Range() UUIDRange {
|
||||
}
|
||||
|
||||
// Iterate loops over a part of the segment table.
|
||||
func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
|
||||
func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]Segment) error) error {
|
||||
var startStreamID uuid.UUID
|
||||
var endStreamID uuid.UUID
|
||||
|
||||
@ -86,7 +85,7 @@ func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]
|
||||
StartStreamID: startStreamID,
|
||||
EndStreamID: endStreamID,
|
||||
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
|
||||
segments := make([]segmentloop.Segment, 0, provider.batchSize)
|
||||
segments := make([]Segment, 0, provider.batchSize)
|
||||
|
||||
segment := metabase.LoopSegmentEntry{}
|
||||
for iterator.Next(ctx, &segment) {
|
||||
@ -95,7 +94,7 @@ func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]
|
||||
return err
|
||||
}
|
||||
|
||||
segments = append(segments, segmentloop.Segment(segment))
|
||||
segments = append(segments, Segment(segment))
|
||||
|
||||
if len(segments) >= provider.batchSize {
|
||||
err = fn(segments)
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
type in struct {
|
||||
@ -113,7 +112,7 @@ func runTest(ctx *testcontext.Context, t *testing.T, db *metabase.DB, in in, exp
|
||||
nBatches := 0
|
||||
nSegments := 0
|
||||
for _, r := range ranges {
|
||||
err = r.Iterate(ctx, func(segments []segmentloop.Segment) error {
|
||||
err = r.Iterate(ctx, func(segments []rangedloop.Segment) error {
|
||||
nBatches++
|
||||
nSegments += len(segments)
|
||||
return nil
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"time"
|
||||
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ rangedloop.Observer = (*CallbackObserver)(nil)
|
||||
@ -17,7 +16,7 @@ var _ rangedloop.Partial = (*CallbackObserver)(nil)
|
||||
|
||||
// CallbackObserver can be used to easily attach logic to the ranged segment loop during tests.
|
||||
type CallbackObserver struct {
|
||||
OnProcess func(context.Context, []segmentloop.Segment) error
|
||||
OnProcess func(context.Context, []rangedloop.Segment) error
|
||||
OnStart func(context.Context, time.Time) error
|
||||
OnFork func(context.Context) (rangedloop.Partial, error)
|
||||
OnJoin func(context.Context, rangedloop.Partial) error
|
||||
@ -85,7 +84,7 @@ func (c *CallbackObserver) Finish(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Process executes a callback for every batch of segment in the ranged segment loop.
|
||||
func (c *CallbackObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (c *CallbackObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
delay()
|
||||
if c.OnProcess == nil {
|
||||
return nil
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ rangedloop.Observer = (*CountObserver)(nil)
|
||||
@ -45,7 +44,7 @@ func (c *CountObserver) Finish(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Process counts the size of a batch of segments.
|
||||
func (c *CountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (c *CountObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
c.NumSegments += len(segments)
|
||||
return nil
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ rangedloop.RangeSplitter = (*InfiniteSegmentProvider)(nil)
|
||||
@ -31,9 +30,9 @@ func (m *InfiniteSegmentProvider) Range() rangedloop.UUIDRange {
|
||||
}
|
||||
|
||||
// Iterate allows to loop over the segments stored in the provider.
|
||||
func (m *InfiniteSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
|
||||
func (m *InfiniteSegmentProvider) Iterate(ctx context.Context, fn func([]rangedloop.Segment) error) error {
|
||||
for {
|
||||
err := fn(make([]segmentloop.Segment, 3))
|
||||
err := fn(make([]rangedloop.Segment, 3))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -9,21 +9,20 @@ import (
|
||||
"sort"
|
||||
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ rangedloop.RangeSplitter = (*RangeSplitter)(nil)
|
||||
|
||||
// RangeSplitter allows to iterate over segments from an in-memory source.
|
||||
type RangeSplitter struct {
|
||||
Segments []segmentloop.Segment
|
||||
Segments []rangedloop.Segment
|
||||
}
|
||||
|
||||
var _ rangedloop.SegmentProvider = (*SegmentProvider)(nil)
|
||||
|
||||
// SegmentProvider allows to iterate over segments from an in-memory source.
|
||||
type SegmentProvider struct {
|
||||
Segments []segmentloop.Segment
|
||||
Segments []rangedloop.Segment
|
||||
|
||||
batchSize int
|
||||
}
|
||||
@ -56,7 +55,7 @@ func (m *SegmentProvider) Range() rangedloop.UUIDRange {
|
||||
}
|
||||
|
||||
// Iterate allows to loop over the segments stored in the provider.
|
||||
func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
|
||||
func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]rangedloop.Segment) error) error {
|
||||
for offset := 0; offset < len(m.Segments); offset += m.batchSize {
|
||||
end := min(offset+m.batchSize, len(m.Segments))
|
||||
err := fn(m.Segments[offset:end])
|
||||
@ -75,9 +74,9 @@ func min(x, y int) int {
|
||||
return y
|
||||
}
|
||||
|
||||
func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment {
|
||||
func streamsFromSegments(segments []rangedloop.Segment) [][]rangedloop.Segment {
|
||||
// Duplicate and sort the segments by stream ID
|
||||
segments = append([]segmentloop.Segment(nil), segments...)
|
||||
segments = append([]rangedloop.Segment(nil), segments...)
|
||||
sort.Slice(segments, func(i int, j int) bool {
|
||||
idcmp := segments[i].StreamID.Compare(segments[j].StreamID)
|
||||
switch {
|
||||
@ -90,8 +89,8 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment
|
||||
}
|
||||
})
|
||||
// Break up the sorted segments into streams
|
||||
var streams [][]segmentloop.Segment
|
||||
var stream []segmentloop.Segment
|
||||
var streams [][]rangedloop.Segment
|
||||
var stream []rangedloop.Segment
|
||||
for _, segment := range segments {
|
||||
if len(stream) > 0 && stream[0].StreamID != segment.StreamID {
|
||||
// Stream ID changed; push and reset stream
|
||||
@ -108,8 +107,8 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment
|
||||
return streams
|
||||
}
|
||||
|
||||
func segmentsFromStreams(streams [][]segmentloop.Segment) []segmentloop.Segment {
|
||||
var segments []segmentloop.Segment
|
||||
func segmentsFromStreams(streams [][]rangedloop.Segment) []rangedloop.Segment {
|
||||
var segments []rangedloop.Segment
|
||||
for _, stream := range streams {
|
||||
segments = append(segments, stream...)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -22,30 +22,30 @@ var (
|
||||
)
|
||||
|
||||
func TestSplitter(t *testing.T) {
|
||||
mkseg := func(streamID byte, pos uint64) segmentloop.Segment {
|
||||
return segmentloop.Segment{
|
||||
mkseg := func(streamID byte, pos uint64) rangedloop.Segment {
|
||||
return rangedloop.Segment{
|
||||
StreamID: uuid.UUID{0: streamID},
|
||||
Position: metabase.SegmentPositionFromEncoded(pos),
|
||||
}
|
||||
}
|
||||
|
||||
mkstream := func(streamID byte, numSegments int) []segmentloop.Segment {
|
||||
var stream []segmentloop.Segment
|
||||
mkstream := func(streamID byte, numSegments int) []rangedloop.Segment {
|
||||
var stream []rangedloop.Segment
|
||||
for i := 0; i < numSegments; i++ {
|
||||
stream = append(stream, mkseg(streamID, uint64(numSegments)))
|
||||
}
|
||||
return stream
|
||||
}
|
||||
|
||||
intermix := func(segments []segmentloop.Segment) []segmentloop.Segment {
|
||||
segments = append([]segmentloop.Segment(nil), segments...)
|
||||
intermix := func(segments []rangedloop.Segment) []rangedloop.Segment {
|
||||
segments = append([]rangedloop.Segment(nil), segments...)
|
||||
r.Shuffle(len(segments), func(i, j int) {
|
||||
segments[i], segments[j] = segments[j], segments[i]
|
||||
})
|
||||
return segments
|
||||
}
|
||||
|
||||
combine := func(streams ...[]segmentloop.Segment) []segmentloop.Segment {
|
||||
combine := func(streams ...[]rangedloop.Segment) []rangedloop.Segment {
|
||||
return segmentsFromStreams(streams)
|
||||
}
|
||||
|
||||
@ -57,15 +57,15 @@ func TestSplitter(t *testing.T) {
|
||||
|
||||
for _, tt := range []struct {
|
||||
desc string
|
||||
segments []segmentloop.Segment
|
||||
segments []rangedloop.Segment
|
||||
numRanges int
|
||||
expectRanges [][]segmentloop.Segment
|
||||
expectRanges [][]rangedloop.Segment
|
||||
}{
|
||||
{
|
||||
desc: "no segments",
|
||||
segments: nil,
|
||||
numRanges: 2,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
@ -74,7 +74,7 @@ func TestSplitter(t *testing.T) {
|
||||
desc: "one stream over two ranges",
|
||||
segments: stream1,
|
||||
numRanges: 2,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
stream1,
|
||||
{},
|
||||
},
|
||||
@ -83,7 +83,7 @@ func TestSplitter(t *testing.T) {
|
||||
desc: "two streams over two ranges",
|
||||
segments: combine(stream1, stream2),
|
||||
numRanges: 2,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
stream1,
|
||||
stream2,
|
||||
},
|
||||
@ -92,7 +92,7 @@ func TestSplitter(t *testing.T) {
|
||||
desc: "three streams over two ranges",
|
||||
segments: combine(stream1, stream2, stream3),
|
||||
numRanges: 2,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
combine(stream1, stream2),
|
||||
stream3,
|
||||
},
|
||||
@ -101,7 +101,7 @@ func TestSplitter(t *testing.T) {
|
||||
desc: "three streams intermixed over two ranges",
|
||||
segments: intermix(combine(stream1, stream2, stream3)),
|
||||
numRanges: 2,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
combine(stream1, stream2),
|
||||
stream3,
|
||||
},
|
||||
@ -110,7 +110,7 @@ func TestSplitter(t *testing.T) {
|
||||
desc: "five streams intermixed over three ranges",
|
||||
segments: intermix(combine(stream1, stream2, stream3, stream4, stream5)),
|
||||
numRanges: 3,
|
||||
expectRanges: [][]segmentloop.Segment{
|
||||
expectRanges: [][]rangedloop.Segment{
|
||||
combine(stream1, stream2),
|
||||
combine(stream3, stream4),
|
||||
stream5,
|
||||
@ -125,10 +125,10 @@ func TestSplitter(t *testing.T) {
|
||||
providers, err := splitter.CreateRanges(tt.numRanges, batchSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
var actualRanges [][]segmentloop.Segment
|
||||
var actualRanges [][]rangedloop.Segment
|
||||
for _, provider := range providers {
|
||||
rangeSegments := []segmentloop.Segment{}
|
||||
err := provider.Iterate(context.Background(), func(segments []segmentloop.Segment) error {
|
||||
rangeSegments := []rangedloop.Segment{}
|
||||
err := provider.Iterate(context.Background(), func(segments []rangedloop.Segment) error {
|
||||
if len(segments) > batchSize {
|
||||
return fmt.Errorf("iterated segments (%d) larger than batch size (%d)", len(segments), batchSize)
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// SleepObserver is a subscriber to the segment loop which sleeps for every batch.
|
||||
@ -38,7 +37,7 @@ func (c *SleepObserver) Finish(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Process sleeps for every batch of segments to simulate execution time.
|
||||
func (c *SleepObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (c *SleepObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
sleepTime := time.Duration(c.Duration.Nanoseconds() * int64(len(segments)))
|
||||
time.Sleep(sleepTime)
|
||||
return nil
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -172,7 +171,7 @@ func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider,
|
||||
return func() (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error {
|
||||
return rangeProvider.Iterate(ctx, func(segments []Segment) error {
|
||||
// check for cancellation every segment batch
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -280,7 +279,7 @@ func finishObserver(ctx context.Context, log *zap.Logger, state observerState) O
|
||||
}
|
||||
}
|
||||
|
||||
func processBatch(ctx context.Context, states []*rangeObserverState, segments []segmentloop.Segment) (err error) {
|
||||
func processBatch(ctx context.Context, states []*rangeObserverState, segments []Segment) (err error) {
|
||||
for _, state := range states {
|
||||
if state.err != nil {
|
||||
// this observer has errored in a previous batch
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/metrics"
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
)
|
||||
@ -66,7 +65,7 @@ func runCountTest(t *testing.T, parallelism int, nSegments int, nObservers int)
|
||||
Parallelism: parallelism,
|
||||
},
|
||||
&rangedlooptest.RangeSplitter{
|
||||
Segments: make([]segmentloop.Segment, nSegments),
|
||||
Segments: make([]rangedloop.Segment, nSegments),
|
||||
},
|
||||
observers,
|
||||
)
|
||||
@ -99,11 +98,11 @@ func TestLoopDuration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
segments := []segmentloop.Segment{}
|
||||
segments := []rangedloop.Segment{}
|
||||
for i := 0; i < nSegments; i++ {
|
||||
streamId, err := uuid.FromBytes([]byte{byte(i), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
|
||||
require.NoError(t, err)
|
||||
segments = append(segments, segmentloop.Segment{
|
||||
segments = append(segments, rangedloop.Segment{
|
||||
StreamID: streamId,
|
||||
})
|
||||
}
|
||||
@ -143,7 +142,7 @@ func TestLoopCancellation(t *testing.T) {
|
||||
observers := []rangedloop.Observer{
|
||||
&rangedlooptest.CountObserver{},
|
||||
&rangedlooptest.CallbackObserver{
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
// cancel from inside the loop, when it is certain that the loop has started
|
||||
cancel()
|
||||
return nil
|
||||
@ -169,7 +168,7 @@ func TestLoopCancellation(t *testing.T) {
|
||||
func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
parallelism := 2
|
||||
batchSize := 1
|
||||
segments := make([]segmentloop.Segment, 2)
|
||||
segments := make([]rangedloop.Segment, 2)
|
||||
|
||||
numOnStartCalls := 0
|
||||
numOnForkCalls := 0
|
||||
@ -193,7 +192,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
incNumOnProcessCalls()
|
||||
return nil
|
||||
},
|
||||
@ -215,7 +214,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
require.Fail(t, "OnFork should not be called")
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
require.Fail(t, "OnProcess should not be called")
|
||||
return nil
|
||||
},
|
||||
@ -237,7 +236,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, errors.New("Test OnFork error")
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
require.Fail(t, "OnProcess should not be called")
|
||||
return nil
|
||||
},
|
||||
@ -259,7 +258,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
incNumOnProcessCalls()
|
||||
return errors.New("Test OnProcess error")
|
||||
},
|
||||
@ -281,7 +280,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
incNumOnProcessCalls()
|
||||
return nil
|
||||
},
|
||||
@ -303,7 +302,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
incNumOnProcessCalls()
|
||||
return nil
|
||||
},
|
||||
@ -325,7 +324,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) {
|
||||
numOnForkCalls++
|
||||
return nil, nil
|
||||
},
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
incNumOnProcessCalls()
|
||||
return nil
|
||||
},
|
||||
@ -483,7 +482,7 @@ func TestLoopBoundaries(t *testing.T) {
|
||||
}
|
||||
|
||||
callbackObserver := rangedlooptest.CallbackObserver{
|
||||
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
// OnProcess is called many times by different goroutines
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -89,7 +88,7 @@ type observerFork struct {
|
||||
|
||||
// Process aggregates metrics about a range of metrics provided by the
|
||||
// segment ranged loop.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||||
for _, segment := range segments {
|
||||
if fork.streamID != segment.StreamID {
|
||||
// Stream ID has changed. Flush what we have so far.
|
||||
|
@ -15,18 +15,17 @@ import (
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var (
|
||||
inline1 = []segmentloop.Segment{
|
||||
inline1 = []rangedloop.Segment{
|
||||
{StreamID: uuid.UUID{1}, EncryptedSize: 10},
|
||||
}
|
||||
remote2 = []segmentloop.Segment{
|
||||
remote2 = []rangedloop.Segment{
|
||||
{StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
|
||||
{StreamID: uuid.UUID{2}, EncryptedSize: 10},
|
||||
}
|
||||
remote3 = []segmentloop.Segment{
|
||||
remote3 = []rangedloop.Segment{
|
||||
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
|
||||
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
|
||||
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
|
||||
@ -37,7 +36,7 @@ var (
|
||||
func TestObserver(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
|
||||
loop := func(tb testing.TB, obs *Observer, streams ...[]segmentloop.Segment) Metrics {
|
||||
loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) Metrics {
|
||||
service := rangedloop.NewService(
|
||||
zap.NewNop(),
|
||||
rangedloop.Config{BatchSize: 2, Parallelism: 2},
|
||||
@ -91,8 +90,8 @@ func TestObserver(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func combineSegments(ss ...[]segmentloop.Segment) []segmentloop.Segment {
|
||||
var combined []segmentloop.Segment
|
||||
func combineSegments(ss ...[]rangedloop.Segment) []rangedloop.Segment {
|
||||
var combined []rangedloop.Segment
|
||||
for _, s := range ss {
|
||||
combined = append(combined, s...)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/repair"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
@ -276,7 +275,7 @@ func (fork *observerFork) loadRedundancy(redundancy storj.RedundancyScheme) (int
|
||||
}
|
||||
|
||||
// Process repair implementation of partial's Process.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
|
||||
for _, segment := range segments {
|
||||
if err := fork.process(ctx, &segment); err != nil {
|
||||
return err
|
||||
@ -286,7 +285,7 @@ func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Se
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fork *observerFork) process(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segment) (err error) {
|
||||
if segment.Inline() {
|
||||
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
|
||||
fork.lastStreamID = segment.StreamID
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
)
|
||||
@ -555,10 +554,10 @@ func BenchmarkRemoteSegment(b *testing.B) {
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(b, err)
|
||||
|
||||
loopSegments := []segmentloop.Segment{}
|
||||
loopSegments := []rangedloop.Segment{}
|
||||
|
||||
for _, segment := range segments {
|
||||
loopSegments = append(loopSegments, segmentloop.Segment{
|
||||
loopSegments = append(loopSegments, rangedloop.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
CreatedAt: segment.CreatedAt,
|
||||
|
Loading…
Reference in New Issue
Block a user