From 0bf50c6825c531da6144ae33cdd8b4ce16c9f97b Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Thu, 6 Jun 2019 17:45:52 +0300 Subject: [PATCH] sync2: prioritize Cycle stopping messages (#2137) --- internal/sync2/cycle.go | 17 +++++++++++++++++ internal/sync2/cycle_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/internal/sync2/cycle.go b/internal/sync2/cycle.go index c22781eed..80a25a565 100644 --- a/internal/sync2/cycle.go +++ b/internal/sync2/cycle.go @@ -88,6 +88,19 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) return err } for { + // prioritize stopping messages + select { + case <-cycle.stopping: + return nil + + case <-ctx.Done(): + // handle control messages + return ctx.Err() + + default: + } + + // handle other messages as well select { case message := <-cycle.control: @@ -166,6 +179,10 @@ func (cycle *Cycle) Stop() { if atomic.CompareAndSwapInt32(&cycle.stopsent, 0, 1) { close(cycle.stopping) } + + if atomic.LoadInt32(&cycle.runexec) == 1 { + <-cycle.stopped + } } // ChangeInterval allows to change the ticker interval after it has started. diff --git a/internal/sync2/cycle_test.go b/internal/sync2/cycle_test.go index 85a844f7f..ec02a224e 100644 --- a/internal/sync2/cycle_test.go +++ b/internal/sync2/cycle_test.go @@ -155,3 +155,29 @@ func TestCycle_Close_NotStarted(t *testing.T) { cycle := sync2.NewCycle(time.Second) cycle.Close() } + +func TestCycle_Stop_EnsureLoopIsFinished(t *testing.T) { + t.Parallel() + + cycle := sync2.NewCycle(time.Second) + defer cycle.Close() + + ctx := context.Background() + + var completed int64 + started := make(chan int) + + go func() { + _ = cycle.Run(ctx, func(_ context.Context) error { + close(started) + time.Sleep(1 * time.Second) + atomic.StoreInt64(&completed, 1) + return nil + }) + }() + + <-started + cycle.Stop() + + require.Equal(t, atomic.LoadInt64(&completed), int64(1)) +}