From c1fbecb96bd159d299040b7ccac73a987f7282d9 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Sat, 10 Apr 2021 12:31:48 +0300 Subject: [PATCH] satellite/metabase/metaloop: add Monitor We need some chores to join without triggering the loop. For example it's fine to run metrics, only when something else is running. Change-Id: I9d8bd16f59c28c540c8d72971bc4e233a8660c02 --- cmd/metabase-verify/verify/verify.go | 22 ++-- satellite/metabase/metaloop/service.go | 128 +++++++++++++++----- satellite/metabase/metaloop/service_test.go | 57 ++++++++- 3 files changed, 161 insertions(+), 46 deletions(-) diff --git a/cmd/metabase-verify/verify/verify.go b/cmd/metabase-verify/verify/verify.go index 017f7657d..30e6d7dbc 100644 --- a/cmd/metabase-verify/verify/verify.go +++ b/cmd/metabase-verify/verify/verify.go @@ -47,24 +47,22 @@ func (chore *Chore) RunOnce(ctx context.Context) error { loop := metaloop.New(chore.Config.Loop, chore.DB) var group errs2.Group + group.Go(func() error { + plainOffset := &SegmentSizes{ + Log: chore.Log.Named("segment-sizes"), + } + err := loop.Join(ctx, plainOffset) + return Error.Wrap(err) + }) + group.Go(func() error { progress := &ProgressObserver{ Log: chore.Log.Named("progress"), ProgressPrintFrequency: chore.Config.ProgressPrintFrequency, } - - plainOffset := &SegmentSizes{ - Log: chore.Log.Named("segment-sizes"), - } - - err := loop.Join(ctx, progress, plainOffset) - if err != nil { - return Error.Wrap(err) - } - + err := loop.Monitor(ctx, progress) progress.Report() - - return nil + return Error.Wrap(err) }) group.Go(func() error { return Error.Wrap(loop.RunOnce(ctx)) diff --git a/satellite/metabase/metaloop/service.go b/satellite/metabase/metaloop/service.go index 587642628..4c2535f27 100644 --- a/satellite/metabase/metaloop/service.go +++ b/satellite/metabase/metaloop/service.go @@ -89,6 +89,7 @@ func (NullObserver) LoopStarted(context.Context, LoopInfo) error { } type observerContext struct { + trigger bool observer Observer ctx context.Context @@ -178,7 +179,7 @@ type MetabaseDB interface { type Service struct { config Config metabaseDB MetabaseDB - join chan []*observerContext + join chan *observerContext done chan struct{} } @@ -187,37 +188,48 @@ func New(config Config, metabaseDB MetabaseDB) *Service { return &Service{ metabaseDB: metabaseDB, config: config, - join: make(chan []*observerContext), + join: make(chan *observerContext), done: make(chan struct{}), } } // Join will join the looper for one full cycle until completion and then returns. +// Joining will trigger a new iteration after coalesce duration. // On ctx cancel the observer will return without completely finishing. // Only on full complete iteration it will return nil. // Safe to be called concurrently. -func (loop *Service) Join(ctx context.Context, observers ...Observer) (err error) { +func (loop *Service) Join(ctx context.Context, observer Observer) (err error) { + return loop.joinObserver(ctx, true, observer) +} + +// Monitor will join the looper for one full cycle until completion and then returns. +// Joining with monitoring won't trigger after coalesce duration. +// On ctx cancel the observer will return without completely finishing. +// Only on full complete iteration it will return nil. +// Safe to be called concurrently. +func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error) { + return loop.joinObserver(ctx, false, observer) +} + +// joinObserver will join the looper for one full cycle until completion and then returns. +// On ctx cancel the observer will return without completely finishing. +// Only on full complete iteration it will return nil. +// Safe to be called concurrently. +func (loop *Service) joinObserver(ctx context.Context, trigger bool, obs Observer) (err error) { defer mon.Task()(&ctx)(&err) - obsContexts := make([]*observerContext, len(observers)) - for i, obs := range observers { - obsContexts[i] = newObserverContext(ctx, obs) - } + obsctx := newObserverContext(ctx, obs) + obsctx.trigger = trigger select { - case loop.join <- obsContexts: + case loop.join <- obsctx: case <-ctx.Done(): return ctx.Err() case <-loop.done: return ErrClosed } - var errList errs.Group - for _, ctx := range obsContexts { - errList.Add(ctx.Wait()) - } - - return errList.Err() + return obsctx.Wait() } // Run starts the looping service. @@ -248,30 +260,80 @@ var monMetainfo = monkit.ScopeNamed("storj.io/storj/satellite/metainfo/metaloop" func (loop *Service) RunOnce(ctx context.Context) (err error) { defer monMetainfo.Task()(&ctx)(&err) //mon:locked - var observers []*observerContext - - // wait for the first observer, or exit because context is canceled - select { - case list := <-loop.join: - observers = append(observers, list...) - case <-ctx.Done(): - return ctx.Err() + coalesceTimer := time.NewTimer(loop.config.CoalesceDuration) + defer coalesceTimer.Stop() + if !coalesceTimer.Stop() { + <-coalesceTimer.C } - // after the first observer is found, set timer for CoalesceDuration and add any observers that try to join before the timer is up - timer := time.NewTimer(loop.config.CoalesceDuration) + earlyExit := make(chan *observerContext) + earlyExitDone := make(chan struct{}) + monitorEarlyExit := func(obs *observerContext) { + select { + case <-obs.ctx.Done(): + select { + case <-earlyExitDone: + case earlyExit <- obs: + } + case <-earlyExitDone: + } + } + + timerStarted := false + observers := []*observerContext{} + waitformore: for { select { - case list := <-loop.join: - observers = append(observers, list...) - case <-timer.C: + // when the coalesce timer hits, we have waited enough for observers to join. + case <-coalesceTimer.C: break waitformore + + // wait for a new observer to join. + case obsctx := <-loop.join: + // when the observer triggers the loop and it's the first one, + // then start the coalescing timer. + if obsctx.trigger { + if !timerStarted { + coalesceTimer.Reset(loop.config.CoalesceDuration) + timerStarted = true + } + } + observers = append(observers, obsctx) + go monitorEarlyExit(obsctx) + + // remove an observer from waiting when it's canceled before the loop starts. + case obsctx := <-earlyExit: + for i, obs := range observers { + if obs == obsctx { + observers = append(observers[:i], observers[i+1:]...) + break + } + } + + obsctx.HandleError(obsctx.ctx.Err()) + + // reevalute, whether we acually need to start the loop. + timerShouldRun := false + for _, obs := range observers { + timerShouldRun = timerShouldRun || obs.trigger + } + + if !timerShouldRun && timerStarted { + if !coalesceTimer.Stop() { + <-coalesceTimer.C + } + } + + // when ctx done happens we can finish all the waiting observers. case <-ctx.Done(): - finishObservers(observers) + close(earlyExitDone) + errorObservers(observers, ctx.Err()) return ctx.Err() } } + close(earlyExitDone) + return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)) } @@ -284,9 +346,7 @@ func (loop *Service) Wait() { func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { defer func() { if err != nil { - for _, observer := range observers { - observer.HandleError(err) - } + errorObservers(observers, err) return } finishObservers(observers) @@ -511,3 +571,9 @@ func finishObservers(observers []*observerContext) { observer.Finish() } } + +func errorObservers(observers []*observerContext, err error) { + for _, observer := range observers { + observer.HandleError(err) + } +} diff --git a/satellite/metabase/metaloop/service_test.go b/satellite/metabase/metaloop/service_test.go index 8bea202ee..3ddffee8d 100644 --- a/satellite/metabase/metaloop/service_test.go +++ b/satellite/metabase/metaloop/service_test.go @@ -234,7 +234,7 @@ func TestLoopObserverCancel(t *testing.T) { // create 1 "good" observer obs1 := newTestObserver(nil) - obs1x := newTestObserver(nil) + mon1 := newTestObserver(nil) // create observer that will return an error from RemoteSegment obs2 := newTestObserver(func(ctx context.Context) error { @@ -256,7 +256,10 @@ func TestLoopObserverCancel(t *testing.T) { var group errgroup.Group group.Go(func() error { - return metaLoop.Join(ctx, obs1, obs1x) + return metaLoop.Join(ctx, obs1) + }) + group.Go(func() error { + return metaLoop.Monitor(ctx, mon1) }) group.Go(func() error { err := metaLoop.Join(ctx, obs2) @@ -281,7 +284,7 @@ func TestLoopObserverCancel(t *testing.T) { // expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one assert.EqualValues(t, 3, obs1.remoteSegCount) - assert.EqualValues(t, 3, obs1x.remoteSegCount) + assert.EqualValues(t, 3, mon1.remoteSegCount) assert.EqualValues(t, 1, obs2.remoteSegCount) assert.EqualValues(t, 1, obs3.remoteSegCount) }) @@ -379,6 +382,54 @@ func TestLoopCancel(t *testing.T) { }) } +func TestLoop_MonitorCancel(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + + metaLoop := metaloop.New(metaloop.Config{ + CoalesceDuration: time.Nanosecond, + ListLimit: 10000, + }, satellite.Metainfo.Metabase) + + obs1 := newTestObserver(func(ctx context.Context) error { + return errors.New("test error") + }) + + var group errgroup.Group + + loopCtx, loopCancel := context.WithCancel(ctx) + group.Go(func() error { + err := metaLoop.Run(loopCtx) + t.Log("metaloop stopped") + if !errs2.IsCanceled(err) { + return errors.New("expected context canceled") + } + return nil + }) + + obsCtx, obsCancel := context.WithCancel(ctx) + group.Go(func() error { + defer loopCancel() + err := metaLoop.Monitor(obsCtx, obs1) + t.Log("observer stopped") + if !errs2.IsCanceled(err) { + return errors.New("expected context canceled") + } + return nil + }) + + obsCancel() + + err := group.Wait() + require.NoError(t, err) + + err = metaLoop.Close() + require.NoError(t, err) + }) +} + type testObserver struct { objectCount int remoteSegCount int