satellite/metabase/rangedloop: measure observer duration (#5350)
Track duration of all segment loop observers. Factor out functions to reduce size. Still need to send the measurements out via monkit. Part of https://github.com/storj/storj/issues/5223 Change-Id: Iae0260e250f8ea33affed95c6592a1f42df384eb
This commit is contained in:
parent
cda1d67465
commit
37b4981cc0
@ -0,0 +1,45 @@
|
|||||||
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package rangedlooptest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"storj.io/storj/satellite/metabase/rangedloop"
|
||||||
|
"storj.io/storj/satellite/metabase/segmentloop"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SleepObserver is a subscriber to the segment loop which sleeps for every batch.
|
||||||
|
type SleepObserver struct {
|
||||||
|
Duration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is the callback for segment loop start.
|
||||||
|
func (c *SleepObserver) Start(ctx context.Context, time time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fork splits the observer to process a segment range.
|
||||||
|
func (c *SleepObserver) Fork(ctx context.Context) (rangedloop.Partial, error) {
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join is a noop.
|
||||||
|
func (c *SleepObserver) Join(ctx context.Context, partial rangedloop.Partial) error {
|
||||||
|
// Range done
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finish is the callback for segment loop end.
|
||||||
|
func (c *SleepObserver) Finish(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process sleeps for every batch of segments to simulate execution time.
|
||||||
|
func (c *SleepObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||||
|
sleepTime := time.Duration(c.Duration.Nanoseconds() * int64(len(segments)))
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
return nil
|
||||||
|
}
|
@ -50,7 +50,18 @@ func NewService(log *zap.Logger, config Config, provider RangeSplitter, observer
|
|||||||
// Improvement: track duration.
|
// Improvement: track duration.
|
||||||
type observerState struct {
|
type observerState struct {
|
||||||
observer Observer
|
observer Observer
|
||||||
rangeObservers []Partial
|
rangeObservers []*rangeObserverState
|
||||||
|
}
|
||||||
|
|
||||||
|
type rangeObserverState struct {
|
||||||
|
rangeObserver Partial
|
||||||
|
duration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserverDuration reports back on how long it took the observer to process all the segments.
|
||||||
|
type ObserverDuration struct {
|
||||||
|
Observer Observer
|
||||||
|
Duration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the looping service.
|
// Run starts the looping service.
|
||||||
@ -58,7 +69,8 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
|||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := service.RunOnce(ctx); err != nil {
|
_, err := service.RunOnce(ctx)
|
||||||
|
if err != nil {
|
||||||
service.log.Error("ranged loop failure", zap.Error(err))
|
service.log.Error("ranged loop failure", zap.Error(err))
|
||||||
|
|
||||||
if errs2.IsCanceled(err) {
|
if errs2.IsCanceled(err) {
|
||||||
@ -74,81 +86,115 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RunOnce goes through one time and sends information to observers.
|
// RunOnce goes through one time and sends information to observers.
|
||||||
func (service *Service) RunOnce(ctx context.Context) (err error) {
|
func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
startTime := time.Now()
|
observerStates, err := startObservers(ctx, service.observers)
|
||||||
observerStates := []observerState{}
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
for _, obs := range service.observers {
|
|
||||||
err := obs.Start(ctx, startTime)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
observerStates = append(observerStates, observerState{
|
|
||||||
observer: obs,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rangeProviders, err := service.provider.CreateRanges(service.config.Parallelism, service.config.BatchSize)
|
rangeProviders, err := service.provider.CreateRanges(service.config.Parallelism, service.config.BatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
group := errs2.Group{}
|
group := errs2.Group{}
|
||||||
for rangeIndex, rangeProvider := range rangeProviders {
|
for _, rangeProvider := range rangeProviders {
|
||||||
rangeObservers := []Partial{}
|
rangeObservers := []*rangeObserverState{}
|
||||||
for i, observerState := range observerStates {
|
for i, observerState := range observerStates {
|
||||||
rangeObserver, err := observerState.observer.Fork(ctx)
|
rangeObserver, err := observerState.observer.Fork(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
rangeObservers = append(rangeObservers, rangeObserver)
|
rangeState := &rangeObserverState{
|
||||||
observerStates[i].rangeObservers = append(observerStates[i].rangeObservers, rangeObserver)
|
rangeObserver: rangeObserver,
|
||||||
|
}
|
||||||
|
rangeObservers = append(rangeObservers, rangeState)
|
||||||
|
observerStates[i].rangeObservers = append(observerStates[i].rangeObservers, rangeState)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create closure to capture loop variables.
|
// Create closure to capture loop variables.
|
||||||
createClosure := func(ctx context.Context, rangeIndex int, rangeProvider SegmentProvider, rangeObservers []Partial) func() error {
|
group.Go(createGoroutineClosure(ctx, rangeProvider, rangeObservers))
|
||||||
return func() (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error {
|
|
||||||
for _, rangeObserver := range rangeObservers {
|
|
||||||
err := rangeObserver.Process(ctx, segments)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
group.Go(createClosure(ctx, rangeIndex, rangeProvider, rangeObservers))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Improvement: stop all ranges when one has an error.
|
// Improvement: stop all ranges when one has an error.
|
||||||
errList := group.Wait()
|
errList := group.Wait()
|
||||||
if errList != nil {
|
if errList != nil {
|
||||||
return errs.Combine(errList...)
|
return nil, errs.Combine(errList...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Segment loop has ended.
|
return finishObservers(ctx, observerStates)
|
||||||
// This is the reduce step.
|
}
|
||||||
for _, state := range observerStates {
|
|
||||||
for _, rangeObserver := range state.rangeObservers {
|
func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, states []*rangeObserverState) func() error {
|
||||||
err := state.observer.Join(ctx, rangeObserver)
|
return func() (err error) {
|
||||||
if err != nil {
|
defer mon.Task()(&ctx)(&err)
|
||||||
return err
|
|
||||||
}
|
return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error {
|
||||||
}
|
for _, state := range states {
|
||||||
|
start := time.Now()
|
||||||
err := state.observer.Finish(ctx)
|
err := state.rangeObserver.Process(ctx, segments)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
state.duration += time.Since(start)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startObservers(ctx context.Context, observers []Observer) (observerStates []observerState, err error) {
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
for _, obs := range observers {
|
||||||
|
state, err := startObserver(ctx, startTime, obs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
observerStates = append(observerStates, state)
|
||||||
|
}
|
||||||
|
|
||||||
|
return observerStates, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startObserver(ctx context.Context, startTime time.Time, observer Observer) (observerState, error) {
|
||||||
|
err := observer.Start(ctx, startTime)
|
||||||
|
|
||||||
|
return observerState{
|
||||||
|
observer: observer,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func finishObservers(ctx context.Context, observerStates []observerState) (observerDurations []ObserverDuration, err error) {
|
||||||
|
for _, state := range observerStates {
|
||||||
|
observerDuration, err := finishObserver(ctx, state)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
observerDurations = append(observerDurations, observerDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
return observerDurations, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterating over the segments is done.
|
||||||
|
// This is the reduce step.
|
||||||
|
func finishObserver(ctx context.Context, state observerState) (ObserverDuration, error) {
|
||||||
|
var duration time.Duration
|
||||||
|
for _, rangeObserver := range state.rangeObservers {
|
||||||
|
err := state.observer.Join(ctx, rangeObserver.rangeObserver)
|
||||||
|
if err != nil {
|
||||||
|
return ObserverDuration{}, err
|
||||||
|
}
|
||||||
|
duration += rangeObserver.duration
|
||||||
|
}
|
||||||
|
|
||||||
|
return ObserverDuration{
|
||||||
|
Duration: duration,
|
||||||
|
Observer: state.observer,
|
||||||
|
}, state.observer.Finish(ctx)
|
||||||
}
|
}
|
||||||
|
@ -4,26 +4,28 @@
|
|||||||
package rangedloop_test
|
package rangedloop_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
|
"storj.io/common/testcontext"
|
||||||
|
"storj.io/common/uuid"
|
||||||
"storj.io/storj/satellite/metabase/rangedloop"
|
"storj.io/storj/satellite/metabase/rangedloop"
|
||||||
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
||||||
"storj.io/storj/satellite/metabase/segmentloop"
|
"storj.io/storj/satellite/metabase/segmentloop"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLoop(t *testing.T) {
|
func TestLoopCount(t *testing.T) {
|
||||||
for _, parallelism := range []int{1, 2, 3} {
|
for _, parallelism := range []int{1, 2, 3} {
|
||||||
for _, nSegments := range []int{0, 1, 2, 11} {
|
for _, nSegments := range []int{0, 1, 2, 11} {
|
||||||
for _, nObservers := range []int{0, 1, 2} {
|
for _, nObservers := range []int{0, 1, 2} {
|
||||||
t.Run(
|
t.Run(
|
||||||
fmt.Sprintf("par%d_seg%d_obs%d", parallelism, nSegments, nObservers),
|
fmt.Sprintf("par%d_seg%d_obs%d", parallelism, nSegments, nObservers),
|
||||||
func(t *testing.T) {
|
func(t *testing.T) {
|
||||||
RunTest(t, parallelism, nSegments, nObservers)
|
runCountTest(t, parallelism, nSegments, nObservers)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -31,9 +33,9 @@ func TestLoop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
|
func runCountTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
|
||||||
batchSize := 2
|
batchSize := 2
|
||||||
ctx := context.Background()
|
ctx := testcontext.New(t)
|
||||||
|
|
||||||
observers := []rangedloop.Observer{}
|
observers := []rangedloop.Observer{}
|
||||||
for i := 0; i < nObservers; i++ {
|
for i := 0; i < nObservers; i++ {
|
||||||
@ -43,9 +45,8 @@ func RunTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
|
|||||||
loopService := rangedloop.NewService(
|
loopService := rangedloop.NewService(
|
||||||
zaptest.NewLogger(t),
|
zaptest.NewLogger(t),
|
||||||
rangedloop.Config{
|
rangedloop.Config{
|
||||||
BatchSize: batchSize,
|
BatchSize: batchSize,
|
||||||
Parallelism: parallelism,
|
Parallelism: parallelism,
|
||||||
AsOfSystemInterval: 0,
|
|
||||||
},
|
},
|
||||||
&rangedlooptest.RangeSplitter{
|
&rangedlooptest.RangeSplitter{
|
||||||
Segments: make([]segmentloop.Segment, nSegments),
|
Segments: make([]segmentloop.Segment, nSegments),
|
||||||
@ -53,11 +54,66 @@ func RunTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
|
|||||||
observers,
|
observers,
|
||||||
)
|
)
|
||||||
|
|
||||||
err := loopService.RunOnce(ctx)
|
observerDurations, err := loopService.RunOnce(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Len(t, observerDurations, nObservers)
|
||||||
|
|
||||||
for _, observer := range observers {
|
for _, observer := range observers {
|
||||||
countObserver := observer.(*rangedlooptest.CountObserver)
|
countObserver := observer.(*rangedlooptest.CountObserver)
|
||||||
require.Equal(t, nSegments, countObserver.NumSegments)
|
require.Equal(t, nSegments, countObserver.NumSegments)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoopDuration(t *testing.T) {
|
||||||
|
t.Skip("Flaky test because it validates concurrency by measuring time")
|
||||||
|
|
||||||
|
nSegments := 8
|
||||||
|
nObservers := 2
|
||||||
|
parallelism := 4
|
||||||
|
batchSize := 2
|
||||||
|
sleepIncrement := time.Millisecond * 10
|
||||||
|
|
||||||
|
ctx := testcontext.New(t)
|
||||||
|
|
||||||
|
observers := []rangedloop.Observer{}
|
||||||
|
for i := 0; i < nObservers; i++ {
|
||||||
|
observers = append(observers, &rangedlooptest.SleepObserver{
|
||||||
|
Duration: sleepIncrement,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
segments := []segmentloop.Segment{}
|
||||||
|
for i := 0; i < nSegments; i++ {
|
||||||
|
streamId, err := uuid.FromBytes([]byte{byte(i), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
|
||||||
|
require.NoError(t, err)
|
||||||
|
segments = append(segments, segmentloop.Segment{
|
||||||
|
StreamID: streamId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
loopService := rangedloop.NewService(
|
||||||
|
zaptest.NewLogger(t),
|
||||||
|
rangedloop.Config{
|
||||||
|
BatchSize: batchSize,
|
||||||
|
Parallelism: parallelism,
|
||||||
|
},
|
||||||
|
&rangedlooptest.RangeSplitter{
|
||||||
|
Segments: segments,
|
||||||
|
},
|
||||||
|
observers,
|
||||||
|
)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
observerDurations, err := loopService.RunOnce(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
duration := time.Since(start)
|
||||||
|
expectedDuration := time.Duration(int64(nSegments) * int64(sleepIncrement) * int64(nObservers) / int64(parallelism))
|
||||||
|
require.Equal(t, expectedDuration, duration.Truncate(sleepIncrement))
|
||||||
|
|
||||||
|
require.Len(t, observerDurations, nObservers)
|
||||||
|
for _, observerDuration := range observerDurations {
|
||||||
|
expectedSleep := time.Duration(int64(nSegments) * int64(sleepIncrement))
|
||||||
|
require.Equal(t, expectedSleep, observerDuration.Duration.Round(sleepIncrement))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -43,7 +43,7 @@ func TestObserver(t *testing.T) {
|
|||||||
rangedloop.Config{BatchSize: 2, Parallelism: 2},
|
rangedloop.Config{BatchSize: 2, Parallelism: 2},
|
||||||
&rangedlooptest.RangeSplitter{Segments: combineSegments(streams...)},
|
&rangedlooptest.RangeSplitter{Segments: combineSegments(streams...)},
|
||||||
[]rangedloop.Observer{obs})
|
[]rangedloop.Observer{obs})
|
||||||
err := service.RunOnce(ctx)
|
_, err := service.RunOnce(ctx)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
return obs.TestingMetrics()
|
return obs.TestingMetrics()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user