From de2242a05ac6d5c32cc8a5499b500f21d71cf581 Mon Sep 17 00:00:00 2001 From: Ivan Fraixedes Date: Thu, 9 May 2019 20:19:06 +0200 Subject: [PATCH] internal/sync2: Cycle minor impl & docs improvements (#1936) * internal/sync2: Cycle minor impl & docs improvements Add the following improvements to the Cycle type of the internal/sync2 package: * Avoid that Close method hang if Start/Run has not been called. * Ensure that internal ticker is always stopped. * Add clarifications when methods calls panic. * internal/sync2: add minimal package level docs * Changes defer --- internal/sync2/cycle.go | 25 +++++++++++++++++++----- internal/sync2/cycle_test.go | 38 +++++++++++++++++++++++++++++++----- internal/sync2/doc.go | 12 ++++++++++++ 3 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 internal/sync2/doc.go diff --git a/internal/sync2/cycle.go b/internal/sync2/cycle.go index 444dcc08a..aed7a65b8 100644 --- a/internal/sync2/cycle.go +++ b/internal/sync2/cycle.go @@ -14,7 +14,10 @@ import ( // Cycle implements a controllable recurring event. // -// Cycle control methods don't have any effect after the cycle has completed. +// Cycle control methods PANICS after Close has been called and don't have any +// effect after Stop has been called. +// +// Start or Run (only of of them, not both) must be only called once. type Cycle struct { interval time.Duration @@ -22,6 +25,7 @@ type Cycle struct { control chan interface{} stopsent int64 + runexec int64 stopping chan struct{} stopped chan struct{} @@ -56,8 +60,9 @@ func (cycle *Cycle) initialize() { }) } -// Start runs the specified function with an errgroup +// Start runs the specified function with an errgroup. func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error) { + atomic.CompareAndSwapInt64(&cycle.runexec, 0, 1) group.Go(func() error { return cycle.Run(ctx, fn) }) @@ -67,12 +72,17 @@ func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ct // // Every interval `fn` is started. // When `fn` is not fast enough, it may skip some of those executions. +// +// Run PANICS if it's called after Stop has been called. func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error { + atomic.CompareAndSwapInt64(&cycle.runexec, 0, 1) cycle.initialize() defer close(cycle.stopped) currentInterval := cycle.interval cycle.ticker = time.NewTicker(currentInterval) + defer cycle.ticker.Stop() + if err := fn(ctx); err != nil { return err } @@ -107,7 +117,7 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) return err } if message.done != nil { - message.done <- struct{}{} + close(message.done) } } @@ -128,9 +138,15 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) } // Close closes all resources associated with it. +// +// It MUST NOT be called concurrently. func (cycle *Cycle) Close() { cycle.Stop() - <-cycle.stopped + + if atomic.LoadInt64(&cycle.runexec) == 1 { + <-cycle.stopped + } + close(cycle.control) } @@ -176,7 +192,6 @@ func (cycle *Cycle) Trigger() { // If it's currently running it waits for the previous to complete and then runs. func (cycle *Cycle) TriggerWait() { done := make(chan struct{}) - defer close(done) cycle.sendControl(cycleTrigger{done}) select { diff --git a/internal/sync2/cycle_test.go b/internal/sync2/cycle_test.go index 7076efa04..85a844f7f 100644 --- a/internal/sync2/cycle_test.go +++ b/internal/sync2/cycle_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "storj.io/storj/internal/sync2" @@ -28,9 +29,8 @@ func TestCycle_Basic(t *testing.T) { for _, cycle := range []*sync2.Cycle{pointer, &inplace} { cycle := cycle t.Run("", func(t *testing.T) { - defer cycle.Close() - t.Parallel() + defer cycle.Close() count := int64(0) @@ -118,12 +118,40 @@ func TestCycle_StopCancelled(t *testing.T) { cancel() var group errgroup.Group - var count int64 - cycle.Start(ctx, &group, func(ctx context.Context) error { - atomic.AddInt64(&count, 1) + cycle.Start(ctx, &group, func(_ context.Context) error { return nil }) cycle.Stop() cycle.Stop() } + +func TestCycle_Run_NoInterval(t *testing.T) { + t.Parallel() + + cycle := &sync2.Cycle{} + require.Panics(t, + func() { + err := cycle.Run(context.Background(), func(_ context.Context) error { + return nil + }) + + require.NoError(t, err) + }, + "Run without setting an interval should panic", + ) +} + +func TestCycle_Stop_NotStarted(t *testing.T) { + t.Parallel() + + cycle := sync2.NewCycle(time.Second) + cycle.Stop() +} + +func TestCycle_Close_NotStarted(t *testing.T) { + t.Parallel() + + cycle := sync2.NewCycle(time.Second) + cycle.Close() +} diff --git a/internal/sync2/doc.go b/internal/sync2/doc.go new file mode 100644 index 000000000..13dba7d14 --- /dev/null +++ b/internal/sync2/doc.go @@ -0,0 +1,12 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +/*Package sync2 provides a set of functions and types for: + +* Having context aware functionalities which aren't present in the standard + library. +* For offloading memory through the file system. +* To control execution of tasks which can run repetitively, concurrently or + asynchronously. +*/ +package sync2