sync2: prioritize Cycle stopping messages (#2137)

This commit is contained in:
Egon Elbre 2019-06-06 17:45:52 +03:00 committed by Dennis Coyle
parent 28a1201590
commit 0bf50c6825
2 changed files with 43 additions and 0 deletions

View File

@ -88,6 +88,19 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
return err return err
} }
for { for {
// prioritize stopping messages
select {
case <-cycle.stopping:
return nil
case <-ctx.Done():
// handle control messages
return ctx.Err()
default:
}
// handle other messages as well
select { select {
case message := <-cycle.control: case message := <-cycle.control:
@ -166,6 +179,10 @@ func (cycle *Cycle) Stop() {
if atomic.CompareAndSwapInt32(&cycle.stopsent, 0, 1) { if atomic.CompareAndSwapInt32(&cycle.stopsent, 0, 1) {
close(cycle.stopping) close(cycle.stopping)
} }
if atomic.LoadInt32(&cycle.runexec) == 1 {
<-cycle.stopped
}
} }
// ChangeInterval allows to change the ticker interval after it has started. // ChangeInterval allows to change the ticker interval after it has started.

View File

@ -155,3 +155,29 @@ func TestCycle_Close_NotStarted(t *testing.T) {
cycle := sync2.NewCycle(time.Second) cycle := sync2.NewCycle(time.Second)
cycle.Close() cycle.Close()
} }
func TestCycle_Stop_EnsureLoopIsFinished(t *testing.T) {
t.Parallel()
cycle := sync2.NewCycle(time.Second)
defer cycle.Close()
ctx := context.Background()
var completed int64
started := make(chan int)
go func() {
_ = cycle.Run(ctx, func(_ context.Context) error {
close(started)
time.Sleep(1 * time.Second)
atomic.StoreInt64(&completed, 1)
return nil
})
}()
<-started
cycle.Stop()
require.Equal(t, atomic.LoadInt64(&completed), int64(1))
}