internal/sync2: Cycle minor impl & docs improvements (#1936)
* internal/sync2: Cycle minor impl & docs improvements Add the following improvements to the Cycle type of the internal/sync2 package: * Avoid that Close method hang if Start/Run has not been called. * Ensure that internal ticker is always stopped. * Add clarifications when methods calls panic. * internal/sync2: add minimal package level docs * Changes defer
This commit is contained in:
parent
c6d189a871
commit
de2242a05a
@ -14,7 +14,10 @@ import (
|
||||
|
||||
// Cycle implements a controllable recurring event.
|
||||
//
|
||||
// Cycle control methods don't have any effect after the cycle has completed.
|
||||
// Cycle control methods PANICS after Close has been called and don't have any
|
||||
// effect after Stop has been called.
|
||||
//
|
||||
// Start or Run (only of of them, not both) must be only called once.
|
||||
type Cycle struct {
|
||||
interval time.Duration
|
||||
|
||||
@ -22,6 +25,7 @@ type Cycle struct {
|
||||
control chan interface{}
|
||||
|
||||
stopsent int64
|
||||
runexec int64
|
||||
stopping chan struct{}
|
||||
stopped chan struct{}
|
||||
|
||||
@ -56,8 +60,9 @@ func (cycle *Cycle) initialize() {
|
||||
})
|
||||
}
|
||||
|
||||
// Start runs the specified function with an errgroup
|
||||
// Start runs the specified function with an errgroup.
|
||||
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error) {
|
||||
atomic.CompareAndSwapInt64(&cycle.runexec, 0, 1)
|
||||
group.Go(func() error {
|
||||
return cycle.Run(ctx, fn)
|
||||
})
|
||||
@ -67,12 +72,17 @@ func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ct
|
||||
//
|
||||
// Every interval `fn` is started.
|
||||
// When `fn` is not fast enough, it may skip some of those executions.
|
||||
//
|
||||
// Run PANICS if it's called after Stop has been called.
|
||||
func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
atomic.CompareAndSwapInt64(&cycle.runexec, 0, 1)
|
||||
cycle.initialize()
|
||||
defer close(cycle.stopped)
|
||||
|
||||
currentInterval := cycle.interval
|
||||
cycle.ticker = time.NewTicker(currentInterval)
|
||||
defer cycle.ticker.Stop()
|
||||
|
||||
if err := fn(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -107,7 +117,7 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
|
||||
return err
|
||||
}
|
||||
if message.done != nil {
|
||||
message.done <- struct{}{}
|
||||
close(message.done)
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,9 +138,15 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
|
||||
}
|
||||
|
||||
// Close closes all resources associated with it.
|
||||
//
|
||||
// It MUST NOT be called concurrently.
|
||||
func (cycle *Cycle) Close() {
|
||||
cycle.Stop()
|
||||
|
||||
if atomic.LoadInt64(&cycle.runexec) == 1 {
|
||||
<-cycle.stopped
|
||||
}
|
||||
|
||||
close(cycle.control)
|
||||
}
|
||||
|
||||
@ -176,7 +192,6 @@ func (cycle *Cycle) Trigger() {
|
||||
// If it's currently running it waits for the previous to complete and then runs.
|
||||
func (cycle *Cycle) TriggerWait() {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
cycle.sendControl(cycleTrigger{done})
|
||||
select {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
@ -28,9 +29,8 @@ func TestCycle_Basic(t *testing.T) {
|
||||
for _, cycle := range []*sync2.Cycle{pointer, &inplace} {
|
||||
cycle := cycle
|
||||
t.Run("", func(t *testing.T) {
|
||||
defer cycle.Close()
|
||||
|
||||
t.Parallel()
|
||||
defer cycle.Close()
|
||||
|
||||
count := int64(0)
|
||||
|
||||
@ -118,12 +118,40 @@ func TestCycle_StopCancelled(t *testing.T) {
|
||||
cancel()
|
||||
|
||||
var group errgroup.Group
|
||||
var count int64
|
||||
cycle.Start(ctx, &group, func(ctx context.Context) error {
|
||||
atomic.AddInt64(&count, 1)
|
||||
cycle.Start(ctx, &group, func(_ context.Context) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
cycle.Stop()
|
||||
cycle.Stop()
|
||||
}
|
||||
|
||||
func TestCycle_Run_NoInterval(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cycle := &sync2.Cycle{}
|
||||
require.Panics(t,
|
||||
func() {
|
||||
err := cycle.Run(context.Background(), func(_ context.Context) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
},
|
||||
"Run without setting an interval should panic",
|
||||
)
|
||||
}
|
||||
|
||||
func TestCycle_Stop_NotStarted(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cycle := sync2.NewCycle(time.Second)
|
||||
cycle.Stop()
|
||||
}
|
||||
|
||||
func TestCycle_Close_NotStarted(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cycle := sync2.NewCycle(time.Second)
|
||||
cycle.Close()
|
||||
}
|
||||
|
12
internal/sync2/doc.go
Normal file
12
internal/sync2/doc.go
Normal file
@ -0,0 +1,12 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
/*Package sync2 provides a set of functions and types for:
|
||||
|
||||
* Having context aware functionalities which aren't present in the standard
|
||||
library.
|
||||
* For offloading memory through the file system.
|
||||
* To control execution of tasks which can run repetitively, concurrently or
|
||||
asynchronously.
|
||||
*/
|
||||
package sync2
|
Loading…
Reference in New Issue
Block a user