diff --git a/satellite/accounting/nodetally/observer.go b/satellite/accounting/nodetally/observer.go index edc0cd750..7ce7c4470 100644 --- a/satellite/accounting/nodetally/observer.go +++ b/satellite/accounting/nodetally/observer.go @@ -15,7 +15,6 @@ import ( "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var ( @@ -154,7 +153,7 @@ func newObserverFork(log *zap.Logger, nowFn func() time.Time) *observerFork { } // Process iterates over segment range updating partial node usage map. -func (partial *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (partial *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error { now := partial.nowFn() for _, segment := range segments { @@ -164,7 +163,7 @@ func (partial *observerFork) Process(ctx context.Context, segments []segmentloop return nil } -func (partial *observerFork) processSegment(now time.Time, segment segmentloop.Segment) { +func (partial *observerFork) processSegment(now time.Time, segment rangedloop.Segment) { if segment.Inline() { return } diff --git a/satellite/accounting/nodetally/observer_test.go b/satellite/accounting/nodetally/observer_test.go index 0e9cb3118..ce71bf195 100644 --- a/satellite/accounting/nodetally/observer_test.go +++ b/satellite/accounting/nodetally/observer_test.go @@ -21,7 +21,7 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/accounting/nodetally" - "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/metabase/rangedloop" ) func TestSingleObjectNodeTallyRangedLoop(t *testing.T) { @@ -298,10 +298,10 @@ func BenchmarkProcess(b *testing.B) { segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) - loopSegments := []segmentloop.Segment{} + loopSegments := []rangedloop.Segment{} for _, segment := range segments { - loopSegments = append(loopSegments, segmentloop.Segment{ + loopSegments = append(loopSegments, rangedloop.Segment{ StreamID: segment.StreamID, Position: segment.Position, CreatedAt: segment.CreatedAt, diff --git a/satellite/audit/observer.go b/satellite/audit/observer.go index 02b109005..8278136cd 100644 --- a/satellite/audit/observer.go +++ b/satellite/audit/observer.go @@ -14,7 +14,6 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) // Observer populates reservoirs and the audit queue. @@ -140,7 +139,7 @@ func newObserverFork(reservoirSlots int, r *rand.Rand) *observerFork { } // Process performs per-node reservoir sampling on remote segments for addition into the audit queue. -func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { +func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) { for _, segment := range segments { // The reservoir ends up deferencing and copying the segment internally // but that's not obvious, so alias the loop variable. diff --git a/satellite/audit/observer_test.go b/satellite/audit/observer_test.go index 154ebfe49..1adb38e9a 100644 --- a/satellite/audit/observer_test.go +++ b/satellite/audit/observer_test.go @@ -18,7 +18,6 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) // TestAuditCollector does the following: @@ -98,10 +97,10 @@ func BenchmarkRemoteSegment(b *testing.B) { segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) - loopSegments := []segmentloop.Segment{} + loopSegments := []rangedloop.Segment{} for _, segment := range segments { - loopSegments = append(loopSegments, segmentloop.Segment{ + loopSegments = append(loopSegments, rangedloop.Segment{ StreamID: segment.StreamID, Position: segment.Position, CreatedAt: segment.CreatedAt, diff --git a/satellite/audit/reservoir.go b/satellite/audit/reservoir.go index 000b0f220..687d1c595 100644 --- a/satellite/audit/reservoir.go +++ b/satellite/audit/reservoir.go @@ -12,14 +12,14 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/metabase/rangedloop" ) const maxReservoirSize = 3 // Reservoir holds a certain number of segments to reflect a random sample. type Reservoir struct { - segments [maxReservoirSize]segmentloop.Segment + segments [maxReservoirSize]rangedloop.Segment keys [maxReservoirSize]float64 size int8 index int8 @@ -39,7 +39,7 @@ func NewReservoir(size int) *Reservoir { } // Segments returns the segments picked by the reservoir. -func (reservoir *Reservoir) Segments() []segmentloop.Segment { +func (reservoir *Reservoir) Segments() []rangedloop.Segment { return reservoir.segments[:reservoir.index] } @@ -55,12 +55,12 @@ func (reservoir *Reservoir) Keys() []float64 { // be passed in. The way this is accomplished is known as _Reservoir Sampling_. // The specific algorithm we are using here is called A-Res on the Wikipedia // article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res -func (reservoir *Reservoir) Sample(r *rand.Rand, segment segmentloop.Segment) { +func (reservoir *Reservoir) Sample(r *rand.Rand, segment rangedloop.Segment) { k := -math.Log(r.Float64()) / float64(segment.EncryptedSize) reservoir.sample(k, segment) } -func (reservoir *Reservoir) sample(k float64, segment segmentloop.Segment) { +func (reservoir *Reservoir) sample(k float64, segment rangedloop.Segment) { if reservoir.index < reservoir.size { reservoir.segments[reservoir.index] = segment reservoir.keys[reservoir.index] = k @@ -99,7 +99,7 @@ type Segment struct { } // NewSegment creates a new segment to audit from a metainfo loop segment. -func NewSegment(loopSegment segmentloop.Segment) Segment { +func NewSegment(loopSegment rangedloop.Segment) Segment { return Segment{ StreamID: loopSegment.StreamID, Position: loopSegment.Position, diff --git a/satellite/audit/reservoir_test.go b/satellite/audit/reservoir_test.go index 305185958..3f5e72c9d 100644 --- a/satellite/audit/reservoir_test.go +++ b/satellite/audit/reservoir_test.go @@ -17,7 +17,7 @@ import ( "storj.io/common/testrand" "storj.io/common/uuid" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/metabase/rangedloop" ) func TestReservoir(t *testing.T) { @@ -25,7 +25,7 @@ func TestReservoir(t *testing.T) { for size := 0; size < maxReservoirSize; size++ { t.Run(fmt.Sprintf("size %d", size), func(t *testing.T) { - samples := []segmentloop.Segment{} + samples := []rangedloop.Segment{} for i := 0; i < size; i++ { samples = append(samples, makeSegment(i)) } @@ -44,7 +44,7 @@ func TestReservoir(t *testing.T) { func TestReservoirMerge(t *testing.T) { t.Run("merge successful", func(t *testing.T) { // Use a fixed rng so we get deterministic sampling results. - segments := []segmentloop.Segment{ + segments := []rangedloop.Segment{ makeSegment(0), makeSegment(1), makeSegment(2), makeSegment(3), makeSegment(4), makeSegment(5), } @@ -65,7 +65,7 @@ func TestReservoirMerge(t *testing.T) { // Segments should contain a cross section from r1 and r2. If the rng // changes, this result will likely change too since that will affect // the keys. and therefore how they are merged. - require.Equal(t, []segmentloop.Segment{ + require.Equal(t, []rangedloop.Segment{ segments[5], segments[1], segments[2], @@ -93,7 +93,7 @@ func TestReservoirWeights(t *testing.T) { weight1StreamID: 0, } - segments := []segmentloop.Segment{ + segments := []rangedloop.Segment{ { StreamID: weight10StreamID, Position: metabase.SegmentPosition{}, @@ -163,7 +163,7 @@ func TestReservoirBias(t *testing.T) { for r := 0; r < numRounds; r++ { res := NewReservoir(reservoirSize) for n := 0; n < numSegments; n++ { - seg := segmentloop.Segment{ + seg := rangedloop.Segment{ EncryptedSize: weight, } binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits)) @@ -198,8 +198,8 @@ func (us uint64Slice) Len() int { return len(us) } func (us uint64Slice) Swap(i, j int) { us[i], us[j] = us[j], us[i] } func (us uint64Slice) Less(i, j int) bool { return us[i] < us[j] } -func makeSegment(n int) segmentloop.Segment { - return segmentloop.Segment{ +func makeSegment(n int) rangedloop.Segment { + return rangedloop.Segment{ StreamID: uuid.UUID{0: byte(n)}, EncryptedSize: int32(n * 1000), } diff --git a/satellite/gc/bloomfilter/observer.go b/satellite/gc/bloomfilter/observer.go index 78f10f2e7..ea5b412c6 100644 --- a/satellite/gc/bloomfilter/observer.go +++ b/satellite/gc/bloomfilter/observer.go @@ -15,7 +15,6 @@ import ( "storj.io/common/memory" "storj.io/common/storj" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) @@ -165,7 +164,7 @@ func newObserverFork(log *zap.Logger, config Config, pieceCounts map[storj.NodeI } // Process adds pieces to the bloom filter from remote segments. -func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error { for _, segment := range segments { if segment.Inline() { continue diff --git a/satellite/gc/bloomfilter/observer_sync.go b/satellite/gc/bloomfilter/observer_sync.go index 8072fb012..24af81a65 100644 --- a/satellite/gc/bloomfilter/observer_sync.go +++ b/satellite/gc/bloomfilter/observer_sync.go @@ -15,7 +15,6 @@ import ( "storj.io/common/memory" "storj.io/common/storj" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) @@ -103,7 +102,7 @@ func (obs *SyncObserver) Finish(ctx context.Context) (err error) { } // Process adds pieces to the bloom filter from remote segments. -func (obs *SyncObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (obs *SyncObserver) Process(ctx context.Context, segments []rangedloop.Segment) error { latestCreationTime := time.Time{} for _, segment := range segments { if segment.Inline() { diff --git a/satellite/gc/bloomfilter/observer_test.go b/satellite/gc/bloomfilter/observer_test.go index 7915850da..3ed766e66 100644 --- a/satellite/gc/bloomfilter/observer_test.go +++ b/satellite/gc/bloomfilter/observer_test.go @@ -26,7 +26,6 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/uplink" ) @@ -245,9 +244,9 @@ func TestObserverGarbageCollection_MultipleRanges(t *testing.T) { segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) - loopSegments := []segmentloop.Segment{} + loopSegments := []rangedloop.Segment{} for _, segment := range segments { - loopSegments = append(loopSegments, segmentloop.Segment{ + loopSegments = append(loopSegments, rangedloop.Segment{ StreamID: segment.StreamID, Position: segment.Position, CreatedAt: segment.CreatedAt, diff --git a/satellite/gracefulexit/observer.go b/satellite/gracefulexit/observer.go index cae9972ba..5b72320e2 100644 --- a/satellite/gracefulexit/observer.go +++ b/satellite/gracefulexit/observer.go @@ -13,7 +13,6 @@ import ( "storj.io/common/storj" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) @@ -192,7 +191,7 @@ func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batc } // Process adds transfer queue items for remote segments belonging to newly exiting nodes. -func (observer *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { +func (observer *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) { // Intentionally omitting mon.Task here. The duration for all process // calls are aggregated and and emitted by the ranged loop service. @@ -211,7 +210,7 @@ func (observer *observerFork) Process(ctx context.Context, segments []segmentloo return nil } -func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment segmentloop.Segment) (err error) { +func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment rangedloop.Segment) (err error) { numPieces := len(segment.Pieces) for _, piece := range segment.Pieces { if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok { diff --git a/satellite/metabase/rangedloop/observer.go b/satellite/metabase/rangedloop/observer.go index 0a8dadaab..1e571947d 100644 --- a/satellite/metabase/rangedloop/observer.go +++ b/satellite/metabase/rangedloop/observer.go @@ -7,9 +7,27 @@ import ( "context" "time" - "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/metabase" ) +// Segment contains information about segment metadata which will be received by observers. +type Segment metabase.LoopSegmentEntry + +// Inline returns true if segment is inline. +func (s Segment) Inline() bool { + return s.Redundancy.IsZero() && len(s.Pieces) == 0 +} + +// Expired checks if segment expired relative to now. +func (s *Segment) Expired(now time.Time) bool { + return s.ExpiresAt != nil && s.ExpiresAt.Before(now) +} + +// PieceSize returns calculated piece size for segment. +func (s Segment) PieceSize() int64 { + return s.Redundancy.PieceSize(int64(s.EncryptedSize)) +} + // Observer subscribes to the parallel segment loop. // It is intended that a naïve implementation is threadsafe. type Observer interface { @@ -33,5 +51,5 @@ type Observer interface { type Partial interface { // Process is called repeatedly with batches of segments. // It is not called concurrently on the same instance. - Process(context.Context, []segmentloop.Segment) error + Process(context.Context, []Segment) error } diff --git a/satellite/metabase/rangedloop/observerlivecount.go b/satellite/metabase/rangedloop/observerlivecount.go index 43f41826f..87861afd0 100644 --- a/satellite/metabase/rangedloop/observerlivecount.go +++ b/satellite/metabase/rangedloop/observerlivecount.go @@ -11,7 +11,6 @@ import ( "github.com/spacemonkeygo/monkit/v3" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" ) var _ monkit.StatSource = (*LiveCountObserver)(nil) @@ -72,7 +71,7 @@ func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error { } // Process increments the counter. -func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (o *LiveCountObserver) Process(ctx context.Context, segments []Segment) error { processed := atomic.AddInt64(&o.segmentsProcessed, int64(len(segments))) mon.IntVal("segmentsProcessed").Observe(processed) diff --git a/satellite/metabase/rangedloop/provider.go b/satellite/metabase/rangedloop/provider.go index 80572277a..8e5136c07 100644 --- a/satellite/metabase/rangedloop/provider.go +++ b/satellite/metabase/rangedloop/provider.go @@ -5,8 +5,6 @@ package rangedloop import ( "context" - - "storj.io/storj/satellite/metabase/segmentloop" ) // RangeSplitter splits a source of segments into ranges, @@ -20,5 +18,5 @@ type RangeSplitter interface { // SegmentProvider iterates through a range of segments. type SegmentProvider interface { Range() UUIDRange - Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error + Iterate(ctx context.Context, fn func([]Segment) error) error } diff --git a/satellite/metabase/rangedloop/providerdb.go b/satellite/metabase/rangedloop/providerdb.go index 440ed6ee5..7e6ca54aa 100644 --- a/satellite/metabase/rangedloop/providerdb.go +++ b/satellite/metabase/rangedloop/providerdb.go @@ -9,7 +9,6 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" ) // MetabaseRangeSplitter implements RangeSplitter. @@ -68,7 +67,7 @@ func (provider *MetabaseSegmentProvider) Range() UUIDRange { } // Iterate loops over a part of the segment table. -func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error { +func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]Segment) error) error { var startStreamID uuid.UUID var endStreamID uuid.UUID @@ -86,7 +85,7 @@ func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([] StartStreamID: startStreamID, EndStreamID: endStreamID, }, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error { - segments := make([]segmentloop.Segment, 0, provider.batchSize) + segments := make([]Segment, 0, provider.batchSize) segment := metabase.LoopSegmentEntry{} for iterator.Next(ctx, &segment) { @@ -95,7 +94,7 @@ func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([] return err } - segments = append(segments, segmentloop.Segment(segment)) + segments = append(segments, Segment(segment)) if len(segments) >= provider.batchSize { err = fn(segments) diff --git a/satellite/metabase/rangedloop/providerdb_test.go b/satellite/metabase/rangedloop/providerdb_test.go index 9aa133278..a63a9d298 100644 --- a/satellite/metabase/rangedloop/providerdb_test.go +++ b/satellite/metabase/rangedloop/providerdb_test.go @@ -15,7 +15,6 @@ import ( "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/metabasetest" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) type in struct { @@ -113,7 +112,7 @@ func runTest(ctx *testcontext.Context, t *testing.T, db *metabase.DB, in in, exp nBatches := 0 nSegments := 0 for _, r := range ranges { - err = r.Iterate(ctx, func(segments []segmentloop.Segment) error { + err = r.Iterate(ctx, func(segments []rangedloop.Segment) error { nBatches++ nSegments += len(segments) return nil diff --git a/satellite/metabase/rangedloop/rangedlooptest/callbackobserver.go b/satellite/metabase/rangedloop/rangedlooptest/callbackobserver.go index fdb96befc..53006ba35 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/callbackobserver.go +++ b/satellite/metabase/rangedloop/rangedlooptest/callbackobserver.go @@ -9,7 +9,6 @@ import ( "time" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var _ rangedloop.Observer = (*CallbackObserver)(nil) @@ -17,7 +16,7 @@ var _ rangedloop.Partial = (*CallbackObserver)(nil) // CallbackObserver can be used to easily attach logic to the ranged segment loop during tests. type CallbackObserver struct { - OnProcess func(context.Context, []segmentloop.Segment) error + OnProcess func(context.Context, []rangedloop.Segment) error OnStart func(context.Context, time.Time) error OnFork func(context.Context) (rangedloop.Partial, error) OnJoin func(context.Context, rangedloop.Partial) error @@ -85,7 +84,7 @@ func (c *CallbackObserver) Finish(ctx context.Context) error { } // Process executes a callback for every batch of segment in the ranged segment loop. -func (c *CallbackObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (c *CallbackObserver) Process(ctx context.Context, segments []rangedloop.Segment) error { delay() if c.OnProcess == nil { return nil diff --git a/satellite/metabase/rangedloop/rangedlooptest/countobserver.go b/satellite/metabase/rangedloop/rangedlooptest/countobserver.go index 5654ec929..f61948cdc 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/countobserver.go +++ b/satellite/metabase/rangedloop/rangedlooptest/countobserver.go @@ -8,7 +8,6 @@ import ( "time" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var _ rangedloop.Observer = (*CountObserver)(nil) @@ -45,7 +44,7 @@ func (c *CountObserver) Finish(ctx context.Context) error { } // Process counts the size of a batch of segments. -func (c *CountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (c *CountObserver) Process(ctx context.Context, segments []rangedloop.Segment) error { c.NumSegments += len(segments) return nil } diff --git a/satellite/metabase/rangedloop/rangedlooptest/infiniteprovider.go b/satellite/metabase/rangedloop/rangedlooptest/infiniteprovider.go index 4027726ad..a3c9f29f8 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/infiniteprovider.go +++ b/satellite/metabase/rangedloop/rangedlooptest/infiniteprovider.go @@ -7,7 +7,6 @@ import ( "context" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var _ rangedloop.RangeSplitter = (*InfiniteSegmentProvider)(nil) @@ -31,9 +30,9 @@ func (m *InfiniteSegmentProvider) Range() rangedloop.UUIDRange { } // Iterate allows to loop over the segments stored in the provider. -func (m *InfiniteSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error { +func (m *InfiniteSegmentProvider) Iterate(ctx context.Context, fn func([]rangedloop.Segment) error) error { for { - err := fn(make([]segmentloop.Segment, 3)) + err := fn(make([]rangedloop.Segment, 3)) if err != nil { return err } diff --git a/satellite/metabase/rangedloop/rangedlooptest/provider.go b/satellite/metabase/rangedloop/rangedlooptest/provider.go index ee7fa7f89..b68edc8de 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/provider.go +++ b/satellite/metabase/rangedloop/rangedlooptest/provider.go @@ -9,21 +9,20 @@ import ( "sort" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var _ rangedloop.RangeSplitter = (*RangeSplitter)(nil) // RangeSplitter allows to iterate over segments from an in-memory source. type RangeSplitter struct { - Segments []segmentloop.Segment + Segments []rangedloop.Segment } var _ rangedloop.SegmentProvider = (*SegmentProvider)(nil) // SegmentProvider allows to iterate over segments from an in-memory source. type SegmentProvider struct { - Segments []segmentloop.Segment + Segments []rangedloop.Segment batchSize int } @@ -56,7 +55,7 @@ func (m *SegmentProvider) Range() rangedloop.UUIDRange { } // Iterate allows to loop over the segments stored in the provider. -func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error { +func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]rangedloop.Segment) error) error { for offset := 0; offset < len(m.Segments); offset += m.batchSize { end := min(offset+m.batchSize, len(m.Segments)) err := fn(m.Segments[offset:end]) @@ -75,9 +74,9 @@ func min(x, y int) int { return y } -func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment { +func streamsFromSegments(segments []rangedloop.Segment) [][]rangedloop.Segment { // Duplicate and sort the segments by stream ID - segments = append([]segmentloop.Segment(nil), segments...) + segments = append([]rangedloop.Segment(nil), segments...) sort.Slice(segments, func(i int, j int) bool { idcmp := segments[i].StreamID.Compare(segments[j].StreamID) switch { @@ -90,8 +89,8 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment } }) // Break up the sorted segments into streams - var streams [][]segmentloop.Segment - var stream []segmentloop.Segment + var streams [][]rangedloop.Segment + var stream []rangedloop.Segment for _, segment := range segments { if len(stream) > 0 && stream[0].StreamID != segment.StreamID { // Stream ID changed; push and reset stream @@ -108,8 +107,8 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment return streams } -func segmentsFromStreams(streams [][]segmentloop.Segment) []segmentloop.Segment { - var segments []segmentloop.Segment +func segmentsFromStreams(streams [][]rangedloop.Segment) []rangedloop.Segment { + var segments []rangedloop.Segment for _, stream := range streams { segments = append(segments, stream...) } diff --git a/satellite/metabase/rangedloop/rangedlooptest/provider_test.go b/satellite/metabase/rangedloop/rangedlooptest/provider_test.go index cf525a292..13655497c 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/provider_test.go +++ b/satellite/metabase/rangedloop/rangedlooptest/provider_test.go @@ -14,7 +14,7 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/metabase/rangedloop" ) var ( @@ -22,30 +22,30 @@ var ( ) func TestSplitter(t *testing.T) { - mkseg := func(streamID byte, pos uint64) segmentloop.Segment { - return segmentloop.Segment{ + mkseg := func(streamID byte, pos uint64) rangedloop.Segment { + return rangedloop.Segment{ StreamID: uuid.UUID{0: streamID}, Position: metabase.SegmentPositionFromEncoded(pos), } } - mkstream := func(streamID byte, numSegments int) []segmentloop.Segment { - var stream []segmentloop.Segment + mkstream := func(streamID byte, numSegments int) []rangedloop.Segment { + var stream []rangedloop.Segment for i := 0; i < numSegments; i++ { stream = append(stream, mkseg(streamID, uint64(numSegments))) } return stream } - intermix := func(segments []segmentloop.Segment) []segmentloop.Segment { - segments = append([]segmentloop.Segment(nil), segments...) + intermix := func(segments []rangedloop.Segment) []rangedloop.Segment { + segments = append([]rangedloop.Segment(nil), segments...) r.Shuffle(len(segments), func(i, j int) { segments[i], segments[j] = segments[j], segments[i] }) return segments } - combine := func(streams ...[]segmentloop.Segment) []segmentloop.Segment { + combine := func(streams ...[]rangedloop.Segment) []rangedloop.Segment { return segmentsFromStreams(streams) } @@ -57,15 +57,15 @@ func TestSplitter(t *testing.T) { for _, tt := range []struct { desc string - segments []segmentloop.Segment + segments []rangedloop.Segment numRanges int - expectRanges [][]segmentloop.Segment + expectRanges [][]rangedloop.Segment }{ { desc: "no segments", segments: nil, numRanges: 2, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ {}, {}, }, @@ -74,7 +74,7 @@ func TestSplitter(t *testing.T) { desc: "one stream over two ranges", segments: stream1, numRanges: 2, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ stream1, {}, }, @@ -83,7 +83,7 @@ func TestSplitter(t *testing.T) { desc: "two streams over two ranges", segments: combine(stream1, stream2), numRanges: 2, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ stream1, stream2, }, @@ -92,7 +92,7 @@ func TestSplitter(t *testing.T) { desc: "three streams over two ranges", segments: combine(stream1, stream2, stream3), numRanges: 2, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ combine(stream1, stream2), stream3, }, @@ -101,7 +101,7 @@ func TestSplitter(t *testing.T) { desc: "three streams intermixed over two ranges", segments: intermix(combine(stream1, stream2, stream3)), numRanges: 2, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ combine(stream1, stream2), stream3, }, @@ -110,7 +110,7 @@ func TestSplitter(t *testing.T) { desc: "five streams intermixed over three ranges", segments: intermix(combine(stream1, stream2, stream3, stream4, stream5)), numRanges: 3, - expectRanges: [][]segmentloop.Segment{ + expectRanges: [][]rangedloop.Segment{ combine(stream1, stream2), combine(stream3, stream4), stream5, @@ -125,10 +125,10 @@ func TestSplitter(t *testing.T) { providers, err := splitter.CreateRanges(tt.numRanges, batchSize) require.NoError(t, err) - var actualRanges [][]segmentloop.Segment + var actualRanges [][]rangedloop.Segment for _, provider := range providers { - rangeSegments := []segmentloop.Segment{} - err := provider.Iterate(context.Background(), func(segments []segmentloop.Segment) error { + rangeSegments := []rangedloop.Segment{} + err := provider.Iterate(context.Background(), func(segments []rangedloop.Segment) error { if len(segments) > batchSize { return fmt.Errorf("iterated segments (%d) larger than batch size (%d)", len(segments), batchSize) } diff --git a/satellite/metabase/rangedloop/rangedlooptest/sleepobserver.go b/satellite/metabase/rangedloop/rangedlooptest/sleepobserver.go index c40b810fa..a73c01c80 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/sleepobserver.go +++ b/satellite/metabase/rangedloop/rangedlooptest/sleepobserver.go @@ -8,7 +8,6 @@ import ( "time" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) // SleepObserver is a subscriber to the segment loop which sleeps for every batch. @@ -38,7 +37,7 @@ func (c *SleepObserver) Finish(ctx context.Context) error { } // Process sleeps for every batch of segments to simulate execution time. -func (c *SleepObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (c *SleepObserver) Process(ctx context.Context, segments []rangedloop.Segment) error { sleepTime := time.Duration(c.Duration.Nanoseconds() * int64(len(segments))) time.Sleep(sleepTime) return nil diff --git a/satellite/metabase/rangedloop/service.go b/satellite/metabase/rangedloop/service.go index 0cf121dbf..b6834260f 100644 --- a/satellite/metabase/rangedloop/service.go +++ b/satellite/metabase/rangedloop/service.go @@ -15,7 +15,6 @@ import ( "storj.io/common/errs2" "storj.io/common/sync2" - "storj.io/storj/satellite/metabase/segmentloop" ) var ( @@ -172,7 +171,7 @@ func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, return func() (err error) { defer mon.Task()(&ctx)(&err) - return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error { + return rangeProvider.Iterate(ctx, func(segments []Segment) error { // check for cancellation every segment batch select { case <-ctx.Done(): @@ -280,7 +279,7 @@ func finishObserver(ctx context.Context, log *zap.Logger, state observerState) O } } -func processBatch(ctx context.Context, states []*rangeObserverState, segments []segmentloop.Segment) (err error) { +func processBatch(ctx context.Context, states []*rangeObserverState, segments []Segment) (err error) { for _, state := range states { if state.err != nil { // this observer has errored in a previous batch diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index 0b5dbeaeb..b3ece78a2 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -30,7 +30,6 @@ import ( "storj.io/storj/satellite/metabase/metabasetest" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/repair/checker" ) @@ -66,7 +65,7 @@ func runCountTest(t *testing.T, parallelism int, nSegments int, nObservers int) Parallelism: parallelism, }, &rangedlooptest.RangeSplitter{ - Segments: make([]segmentloop.Segment, nSegments), + Segments: make([]rangedloop.Segment, nSegments), }, observers, ) @@ -99,11 +98,11 @@ func TestLoopDuration(t *testing.T) { }) } - segments := []segmentloop.Segment{} + segments := []rangedloop.Segment{} for i := 0; i < nSegments; i++ { streamId, err := uuid.FromBytes([]byte{byte(i), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) require.NoError(t, err) - segments = append(segments, segmentloop.Segment{ + segments = append(segments, rangedloop.Segment{ StreamID: streamId, }) } @@ -143,7 +142,7 @@ func TestLoopCancellation(t *testing.T) { observers := []rangedloop.Observer{ &rangedlooptest.CountObserver{}, &rangedlooptest.CallbackObserver{ - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { // cancel from inside the loop, when it is certain that the loop has started cancel() return nil @@ -169,7 +168,7 @@ func TestLoopCancellation(t *testing.T) { func TestLoopContinuesAfterObserverError(t *testing.T) { parallelism := 2 batchSize := 1 - segments := make([]segmentloop.Segment, 2) + segments := make([]rangedloop.Segment, 2) numOnStartCalls := 0 numOnForkCalls := 0 @@ -193,7 +192,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { incNumOnProcessCalls() return nil }, @@ -215,7 +214,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { require.Fail(t, "OnFork should not be called") return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { require.Fail(t, "OnProcess should not be called") return nil }, @@ -237,7 +236,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, errors.New("Test OnFork error") }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { require.Fail(t, "OnProcess should not be called") return nil }, @@ -259,7 +258,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { incNumOnProcessCalls() return errors.New("Test OnProcess error") }, @@ -281,7 +280,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { incNumOnProcessCalls() return nil }, @@ -303,7 +302,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { incNumOnProcessCalls() return nil }, @@ -325,7 +324,7 @@ func TestLoopContinuesAfterObserverError(t *testing.T) { numOnForkCalls++ return nil, nil }, - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { incNumOnProcessCalls() return nil }, @@ -483,7 +482,7 @@ func TestLoopBoundaries(t *testing.T) { } callbackObserver := rangedlooptest.CallbackObserver{ - OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + OnProcess: func(ctx context.Context, segments []rangedloop.Segment) error { // OnProcess is called many times by different goroutines mu.Lock() defer mu.Unlock() diff --git a/satellite/metrics/observer.go b/satellite/metrics/observer.go index 724d92ded..1b8da1e97 100644 --- a/satellite/metrics/observer.go +++ b/satellite/metrics/observer.go @@ -12,7 +12,6 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" ) var ( @@ -89,7 +88,7 @@ type observerFork struct { // Process aggregates metrics about a range of metrics provided by the // segment ranged loop. -func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error { for _, segment := range segments { if fork.streamID != segment.StreamID { // Stream ID has changed. Flush what we have so far. diff --git a/satellite/metrics/observer_test.go b/satellite/metrics/observer_test.go index 1cc688814..f0c8a7d41 100644 --- a/satellite/metrics/observer_test.go +++ b/satellite/metrics/observer_test.go @@ -15,18 +15,17 @@ import ( "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" - "storj.io/storj/satellite/metabase/segmentloop" ) var ( - inline1 = []segmentloop.Segment{ + inline1 = []rangedloop.Segment{ {StreamID: uuid.UUID{1}, EncryptedSize: 10}, } - remote2 = []segmentloop.Segment{ + remote2 = []rangedloop.Segment{ {StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, {StreamID: uuid.UUID{2}, EncryptedSize: 10}, } - remote3 = []segmentloop.Segment{ + remote3 = []rangedloop.Segment{ {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, @@ -37,7 +36,7 @@ var ( func TestObserver(t *testing.T) { ctx := testcontext.New(t) - loop := func(tb testing.TB, obs *Observer, streams ...[]segmentloop.Segment) Metrics { + loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) Metrics { service := rangedloop.NewService( zap.NewNop(), rangedloop.Config{BatchSize: 2, Parallelism: 2}, @@ -91,8 +90,8 @@ func TestObserver(t *testing.T) { }) } -func combineSegments(ss ...[]segmentloop.Segment) []segmentloop.Segment { - var combined []segmentloop.Segment +func combineSegments(ss ...[]rangedloop.Segment) []rangedloop.Segment { + var combined []rangedloop.Segment for _, s := range ss { combined = append(combined, s...) } diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index cc4c6210e..dce09e32e 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -18,7 +18,6 @@ import ( "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair/queue" @@ -276,7 +275,7 @@ func (fork *observerFork) loadRedundancy(redundancy storj.RedundancyScheme) (int } // Process repair implementation of partial's Process. -func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { +func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) { for _, segment := range segments { if err := fork.process(ctx, &segment); err != nil { return err @@ -286,7 +285,7 @@ func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Se return nil } -func (fork *observerFork) process(ctx context.Context, segment *segmentloop.Segment) (err error) { +func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segment) (err error) { if segment.Inline() { if fork.lastStreamID.Compare(segment.StreamID) != 0 { fork.lastStreamID = segment.StreamID diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index addf7d3f0..f527c3d12 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -24,7 +24,6 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/queue" ) @@ -555,10 +554,10 @@ func BenchmarkRemoteSegment(b *testing.B) { segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) - loopSegments := []segmentloop.Segment{} + loopSegments := []rangedloop.Segment{} for _, segment := range segments { - loopSegments = append(loopSegments, segmentloop.Segment{ + loopSegments = append(loopSegments, rangedloop.Segment{ StreamID: segment.StreamID, Position: segment.Position, CreatedAt: segment.CreatedAt,