storj/satellite/metabase/rangedloop/rangedlooptest/provider.go
Michal Niewrzal aba2f14595 satellite/metabase/rangedloop: few additions for monitoring
Additional elements added:
* monkit metric for observers methods like Start/Fork/Join/Finish to
be able to check how much time those methods are taking
* few more logs e.g. entries with processed range
* segmentsProcessed metric to be able to check loop progress

Change-Id: I65dd51f7f5c4bdbb4014fbf04e5b6b10bdb035ec
2023-02-17 08:46:00 +00:00

118 lines
3.1 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedlooptest
import (
"context"
"math"
"sort"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var _ rangedloop.RangeSplitter = (*RangeSplitter)(nil)
// RangeSplitter allows to iterate over segments from an in-memory source.
type RangeSplitter struct {
Segments []segmentloop.Segment
}
var _ rangedloop.SegmentProvider = (*SegmentProvider)(nil)
// SegmentProvider allows to iterate over segments from an in-memory source.
type SegmentProvider struct {
Segments []segmentloop.Segment
batchSize int
}
// CreateRanges splits the segments into equal ranges.
func (m *RangeSplitter) CreateRanges(nRanges int, batchSize int) ([]rangedloop.SegmentProvider, error) {
// The segments for a given stream must be handled by a single segment
// provider. Split the segments into streams.
streams := streamsFromSegments(m.Segments)
// Break up the streams into ranges
rangeSize := int(math.Ceil(float64(len(streams)) / float64(nRanges)))
rangeProviders := []rangedloop.SegmentProvider{}
for i := 0; i < nRanges; i++ {
offset := min(i*rangeSize, len(streams))
end := min(offset+rangeSize, len(streams))
rangeProviders = append(rangeProviders, &SegmentProvider{
Segments: segmentsFromStreams(streams[offset:end]),
batchSize: batchSize,
})
}
return rangeProviders, nil
}
// Range returns range which is processed by this provider.
func (m *SegmentProvider) Range() rangedloop.UUIDRange {
return rangedloop.UUIDRange{}
}
// Iterate allows to loop over the segments stored in the provider.
func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
for offset := 0; offset < len(m.Segments); offset += m.batchSize {
end := min(offset+m.batchSize, len(m.Segments))
err := fn(m.Segments[offset:end])
if err != nil {
return err
}
}
return nil
}
func min(x, y int) int {
if x < y {
return x
}
return y
}
func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment {
// Duplicate and sort the segments by stream ID
segments = append([]segmentloop.Segment(nil), segments...)
sort.Slice(segments, func(i int, j int) bool {
idcmp := segments[i].StreamID.Compare(segments[j].StreamID)
switch {
case idcmp < 0:
return true
case idcmp > 0:
return false
default:
return segments[i].Position.Less(segments[j].Position)
}
})
// Break up the sorted segments into streams
var streams [][]segmentloop.Segment
var stream []segmentloop.Segment
for _, segment := range segments {
if len(stream) > 0 && stream[0].StreamID != segment.StreamID {
// Stream ID changed; push and reset stream
streams = append(streams, stream)
stream = nil
}
stream = append(stream, segment)
}
// Append the last stream (will be empty if there were no segments)
if len(stream) > 0 {
streams = append(streams, stream)
}
return streams
}
func segmentsFromStreams(streams [][]segmentloop.Segment) []segmentloop.Segment {
var segments []segmentloop.Segment
for _, stream := range streams {
segments = append(segments, stream...)
}
return segments
}