diff --git a/satellite/metabase/rangedloop/service.go b/satellite/metabase/rangedloop/service.go index 8efc05c1e..ea64c604b 100644 --- a/satellite/metabase/rangedloop/service.go +++ b/satellite/metabase/rangedloop/service.go @@ -5,6 +5,7 @@ package rangedloop import ( "context" + "fmt" "time" "github.com/spacemonkeygo/monkit/v3" @@ -52,20 +53,27 @@ func NewService(log *zap.Logger, config Config, provider RangeSplitter, observer } // observerState contains information to manage an observer during a loop iteration. -// Improvement: track duration. type observerState struct { observer Observer rangeObservers []*rangeObserverState + // err is the error that occurred during the observer's Start method. + // If err is set, the observer will be skipped during the loop iteration. + err error } type rangeObserverState struct { rangeObserver Partial duration time.Duration + // err is the error that is returned by the observer's Fork or Process method. + // If err is set, the range observer will be skipped during the loop iteration. + err error } // ObserverDuration reports back on how long it took the observer to process all the segments. type ObserverDuration struct { Observer Observer + // When this observer errored out, set duration to -1s + // so someone watching metrics can tell that something went wrong. Duration time.Duration } @@ -101,7 +109,7 @@ func (service *Service) Run(ctx context.Context) (err error) { func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error) { defer mon.Task()(&ctx)(&err) - observerStates, err := startObservers(ctx, service.observers) + observerStates, err := startObservers(ctx, service.log, service.observers) if err != nil { return nil, err } @@ -115,12 +123,13 @@ func (service *Service) RunOnce(ctx context.Context) (observerDurations []Observ for _, rangeProvider := range rangeProviders { rangeObservers := []*rangeObserverState{} for i, observerState := range observerStates { - rangeObserver, err := observerState.observer.Fork(ctx) - if err != nil { - return nil, err + if observerState.err != nil { + continue } + rangeObserver, err := observerState.observer.Fork(ctx) rangeState := &rangeObserverState{ rangeObserver: rangeObserver, + err: err, } rangeObservers = append(rangeObservers, rangeState) observerStates[i].rangeObservers = append(observerStates[i].rangeObservers, rangeState) @@ -136,7 +145,7 @@ func (service *Service) RunOnce(ctx context.Context) (observerDurations []Observ return nil, errs.Combine(errList...) } - return finishObservers(ctx, observerStates) + return finishObservers(ctx, service.log, observerStates) } func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, states []*rangeObserverState) func() error { @@ -155,37 +164,36 @@ func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, } } -func startObservers(ctx context.Context, observers []Observer) (observerStates []observerState, err error) { +func startObservers(ctx context.Context, log *zap.Logger, observers []Observer) (observerStates []observerState, err error) { startTime := time.Now() for _, obs := range observers { - state, err := startObserver(ctx, startTime, obs) - if err != nil { - return nil, err - } - - observerStates = append(observerStates, state) + observerStates = append(observerStates, startObserver(ctx, log, startTime, obs)) } return observerStates, nil } -func startObserver(ctx context.Context, startTime time.Time, observer Observer) (observerState, error) { +func startObserver(ctx context.Context, log *zap.Logger, startTime time.Time, observer Observer) observerState { err := observer.Start(ctx, startTime) + if err != nil { + log.Error( + "Starting observer failed. This observer will be excluded from this run of the ranged segment loop.", + zap.String("observer", fmt.Sprintf("%T", observer)), + zap.Error(err), + ) + } + return observerState{ observer: observer, - }, err + err: err, + } } -func finishObservers(ctx context.Context, observerStates []observerState) (observerDurations []ObserverDuration, err error) { +func finishObservers(ctx context.Context, log *zap.Logger, observerStates []observerState) (observerDurations []ObserverDuration, err error) { for _, state := range observerStates { - observerDuration, err := finishObserver(ctx, state) - if err != nil { - return nil, err - } - - observerDurations = append(observerDurations, observerDuration) + observerDurations = append(observerDurations, finishObserver(ctx, log, state)) } sendObserverDurations(observerDurations) @@ -195,30 +203,79 @@ func finishObservers(ctx context.Context, observerStates []observerState) (obser // Iterating over the segments is done. // This is the reduce step. -func finishObserver(ctx context.Context, state observerState) (ObserverDuration, error) { +func finishObserver(ctx context.Context, log *zap.Logger, state observerState) ObserverDuration { + if state.err != nil { + return ObserverDuration{ + Observer: state.observer, + Duration: -1 * time.Second, + } + } + for _, rangeObserver := range state.rangeObservers { + if rangeObserver.err != nil { + log.Error( + "Observer failed during Process(), it will not be finalized in this run of the ranged segment loop", + zap.String("observer", fmt.Sprintf("%T", state.observer)), + zap.Error(rangeObserver.err), + ) + return ObserverDuration{ + Observer: state.observer, + Duration: -1 * time.Second, + } + } + } + var duration time.Duration for _, rangeObserver := range state.rangeObservers { err := state.observer.Join(ctx, rangeObserver.rangeObserver) if err != nil { - return ObserverDuration{}, err + log.Error( + "Observer failed during Join(), it will not be finalized in this run of the ranged segment loop", + zap.String("observer", fmt.Sprintf("%T", state.observer)), + zap.Error(rangeObserver.err), + ) + return ObserverDuration{ + Observer: state.observer, + Duration: -1 * time.Second, + } } duration += rangeObserver.duration } + err := state.observer.Finish(ctx) + if err != nil { + log.Error( + "Observer failed during Finish()", + zap.String("observer", fmt.Sprintf("%T", state.observer)), + zap.Error(err), + ) + return ObserverDuration{ + Observer: state.observer, + Duration: -1 * time.Second, + } + } + return ObserverDuration{ Duration: duration, Observer: state.observer, - }, state.observer.Finish(ctx) + } } func processBatch(ctx context.Context, states []*rangeObserverState, segments []segmentloop.Segment) (err error) { for _, state := range states { + if state.err != nil { + // this observer has errored in a previous batch + continue + } start := time.Now() err := state.rangeObserver.Process(ctx, segments) - if err != nil { - return err - } state.duration += time.Since(start) + if err != nil { + // unsure if this is necessary here + if errs2.IsCanceled(err) { + return err + } + state.err = err + } } return nil } diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index 54dfdec68..bc9bb1dfe 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -5,7 +5,9 @@ package rangedloop_test import ( "context" + "errors" "fmt" + "sync/atomic" "testing" "time" @@ -149,3 +151,211 @@ func TestLoopCancellation(t *testing.T) { require.ErrorIs(t, err, context.Canceled) } + +func TestLoopContinuesAfterObserverError(t *testing.T) { + parallelism := 2 + batchSize := 1 + segments := make([]segmentloop.Segment, 2) + + numOnStartCalls := 0 + numOnForkCalls := 0 + numOnProcessCalls := int32(0) + numOnJoinCalls := 0 + numOnFinishCalls := 0 + + incNumOnProcessCalls := func() { + atomic.AddInt32(&numOnProcessCalls, 1) + } + + // first and last observer emit no error + // other observers emit an error at different stages + observers := []rangedloop.Observer{ + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + incNumOnProcessCalls() + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + numOnJoinCalls++ + return nil + }, + OnFinish: func(ctx context.Context) error { + numOnFinishCalls++ + return nil + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return errors.New("Test OnStart error") + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + require.Fail(t, "OnFork should not be called") + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + require.Fail(t, "OnProcess should not be called") + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + require.Fail(t, "OnJoin should not be called") + return nil + }, + OnFinish: func(ctx context.Context) error { + require.Fail(t, "OnFinish should not be called") + return nil + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, errors.New("Test OnFork error") + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + require.Fail(t, "OnProcess should not be called") + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + require.Fail(t, "OnJoin should not be called") + return nil + }, + OnFinish: func(ctx context.Context) error { + require.Fail(t, "OnFinish should not be called") + return nil + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + incNumOnProcessCalls() + return errors.New("Test OnProcess error") + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + require.Fail(t, "OnJoin should not be called") + return nil + }, + OnFinish: func(ctx context.Context) error { + require.Fail(t, "OnFinish should not be called") + return nil + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + incNumOnProcessCalls() + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + numOnJoinCalls++ + return errors.New("Test OnJoin error") + }, + OnFinish: func(ctx context.Context) error { + require.Fail(t, "OnFinish should not be called") + return nil + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + incNumOnProcessCalls() + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + numOnJoinCalls++ + return nil + }, + OnFinish: func(ctx context.Context) error { + numOnFinishCalls++ + return errors.New("Test OnFinish error") + }, + }, + &rangedlooptest.CallbackObserver{ + OnStart: func(ctx context.Context, t time.Time) error { + numOnStartCalls++ + return nil + }, + OnFork: func(ctx context.Context) (rangedloop.Partial, error) { + numOnForkCalls++ + return nil, nil + }, + OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error { + incNumOnProcessCalls() + return nil + }, + OnJoin: func(ctx context.Context, partial rangedloop.Partial) error { + numOnJoinCalls++ + return nil + }, + OnFinish: func(ctx context.Context) error { + numOnFinishCalls++ + return nil + }, + }, + } + + loopService := rangedloop.NewService( + zaptest.NewLogger(t), + rangedloop.Config{ + BatchSize: batchSize, + Parallelism: parallelism, + }, + &rangedlooptest.RangeSplitter{ + Segments: segments, + }, + observers, + ) + + observerDurations, err := loopService.RunOnce(testcontext.New(t)) + require.NoError(t, err) + require.Len(t, observerDurations, len(observers)) + + require.EqualValues(t, 7, numOnStartCalls) + require.EqualValues(t, 6*parallelism, numOnForkCalls) + require.EqualValues(t, 5*parallelism-1, numOnProcessCalls) + require.EqualValues(t, 4*parallelism-1, numOnJoinCalls) + require.EqualValues(t, 3, numOnFinishCalls) + + // success observer should have the duration reported + require.Greater(t, observerDurations[0].Duration, time.Duration(0)) + require.Greater(t, observerDurations[6].Duration, time.Duration(0)) + + // error observers should have sentinel duration reported + require.Equal(t, observerDurations[1].Duration, -1*time.Second) + require.Equal(t, observerDurations[2].Duration, -1*time.Second) + require.Equal(t, observerDurations[3].Duration, -1*time.Second) + require.Equal(t, observerDurations[4].Duration, -1*time.Second) + require.Equal(t, observerDurations[5].Duration, -1*time.Second) +}