sync2.Cycle for controllable intervals (#1267)

This commit is contained in:
Egon Elbre 2019-02-07 22:01:13 +02:00 committed by GitHub
parent b736ae4823
commit 8e448e2f6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 263 additions and 0 deletions

176
internal/sync2/cycle.go Normal file
View File

@ -0,0 +1,176 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package sync2
import (
"context"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// Cycle implements a controllable recurring event.
//
// Cycle control methods don't have any effect after the cycle has completed.
type Cycle struct {
interval time.Duration
ticker *time.Ticker
control chan interface{}
stop chan struct{}
init sync.Once
}
type (
// cycle control messages
cyclePause struct{}
cycleContinue struct{}
cycleStop struct{}
cycleChangeInterval struct{ Interval time.Duration }
cycleTrigger struct{ done chan struct{} }
)
// NewCycle creates a new cycle with the specified interval.
func NewCycle(interval time.Duration) *Cycle {
cycle := &Cycle{}
cycle.SetInterval(interval)
return cycle
}
// SetInterval allows to change the interval before starting.
func (cycle *Cycle) SetInterval(interval time.Duration) {
cycle.interval = interval
}
func (cycle *Cycle) initialize() {
cycle.init.Do(func() {
cycle.stop = make(chan struct{})
cycle.control = make(chan interface{})
})
}
// Start runs the specified function with an errgroup
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error) {
group.Go(func() error {
return cycle.Run(ctx, fn)
})
}
// Run runs the specified in an interval.
//
// Every interval `fn` is started.
// When `fn` is not fast enough, it may skip some of those executions.
func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error {
cycle.initialize()
defer close(cycle.stop)
currentInterval := cycle.interval
cycle.ticker = time.NewTicker(currentInterval)
if err := fn(ctx); err != nil {
return err
}
for {
select {
case message := <-cycle.control:
// handle control messages
switch message := message.(type) {
case cycleStop:
return nil
case cycleChangeInterval:
currentInterval = message.Interval
cycle.ticker.Stop()
cycle.ticker = time.NewTicker(currentInterval)
case cyclePause:
cycle.ticker.Stop()
// ensure we don't have ticks left
select {
case <-cycle.ticker.C:
default:
}
case cycleContinue:
cycle.ticker.Stop()
cycle.ticker = time.NewTicker(currentInterval)
case cycleTrigger:
// trigger the function
if err := fn(ctx); err != nil {
return err
}
if message.done != nil {
message.done <- struct{}{}
}
}
case <-ctx.Done():
// handle control messages
return ctx.Err()
case <-cycle.ticker.C:
// trigger the function
if err := fn(ctx); err != nil {
return err
}
}
}
}
// Close closes all resources associated with it.
func (cycle *Cycle) Close() {
close(cycle.control)
}
// sendControl sends a control message
func (cycle *Cycle) sendControl(message interface{}) {
cycle.initialize()
select {
case cycle.control <- message:
case <-cycle.stop:
}
}
// Stop stops the cycle permanently
func (cycle *Cycle) Stop() {
cycle.sendControl(cycleStop{})
}
// ChangeInterval allows to change the ticker interval after it has started.
func (cycle *Cycle) ChangeInterval(interval time.Duration) {
cycle.sendControl(cycleChangeInterval{interval})
}
// Pause pauses the cycle.
func (cycle *Cycle) Pause() {
cycle.sendControl(cyclePause{})
}
// Restart restarts the ticker from 0.
func (cycle *Cycle) Restart() {
cycle.sendControl(cycleContinue{})
}
// Trigger ensures that the loop is done at least once.
// If it's currently running it waits for the previous to complete and then runs.
func (cycle *Cycle) Trigger() {
cycle.sendControl(cycleTrigger{})
}
// TriggerWait ensures that the loop is done at least once and waits for completion.
// If it's currently running it waits for the previous to complete and then runs.
func (cycle *Cycle) TriggerWait() {
done := make(chan struct{})
defer close(done)
cycle.sendControl(cycleTrigger{done})
select {
case <-done:
case <-cycle.stop:
}
}

View File

@ -0,0 +1,87 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package sync2_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/sync2"
)
func TestCycle_Basic(t *testing.T) {
ctx := context.Background()
var inplace sync2.Cycle
inplace.SetInterval(time.Second)
var pointer = sync2.NewCycle(time.Second)
for _, cycle := range []*sync2.Cycle{pointer, &inplace} {
cycle := cycle
t.Run("", func(t *testing.T) {
defer cycle.Close()
t.Parallel()
count := int64(0)
var group errgroup.Group
start := time.Now()
cycle.Start(ctx, &group, func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
})
group.Go(func() error {
defer cycle.Stop()
const expected = 10
cycle.Pause()
startingCount := atomic.LoadInt64(&count)
for i := 0; i < expected-1; i++ {
cycle.Trigger()
}
cycle.TriggerWait()
countAfterTrigger := atomic.LoadInt64(&count)
change := countAfterTrigger - startingCount
if expected != change {
return fmt.Errorf("invalid triggers expected %d got %d", expected, change)
}
cycle.Restart()
time.Sleep(3 * time.Second)
countAfterRestart := atomic.LoadInt64(&count)
if countAfterRestart == countAfterTrigger {
return fmt.Errorf("cycle has not restarted")
}
return nil
})
err := group.Wait()
if err != nil {
t.Error(err)
}
testDuration := time.Since(start)
if testDuration > 7*time.Second {
t.Errorf("test took too long %v, expected approximately 3s", testDuration)
}
// shouldn't block
cycle.Trigger()
})
}
}