179 lines
3.9 KiB
Go
179 lines
3.9 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information
|
|
|
|
package sync2
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Cycle implements a controllable recurring event.
|
|
//
|
|
// Cycle control methods don't have any effect after the cycle has completed.
|
|
type Cycle struct {
|
|
interval time.Duration
|
|
|
|
ticker *time.Ticker
|
|
control chan interface{}
|
|
stop chan struct{}
|
|
|
|
init sync.Once
|
|
}
|
|
|
|
type (
|
|
// cycle control messages
|
|
cyclePause struct{}
|
|
cycleContinue struct{}
|
|
cycleStop struct{}
|
|
cycleChangeInterval struct{ Interval time.Duration }
|
|
cycleTrigger struct{ done chan struct{} }
|
|
)
|
|
|
|
// NewCycle creates a new cycle with the specified interval.
|
|
func NewCycle(interval time.Duration) *Cycle {
|
|
cycle := &Cycle{}
|
|
cycle.SetInterval(interval)
|
|
return cycle
|
|
}
|
|
|
|
// SetInterval allows to change the interval before starting.
|
|
func (cycle *Cycle) SetInterval(interval time.Duration) {
|
|
cycle.interval = interval
|
|
}
|
|
|
|
func (cycle *Cycle) initialize() {
|
|
cycle.init.Do(func() {
|
|
cycle.stop = make(chan struct{})
|
|
cycle.control = make(chan interface{})
|
|
})
|
|
}
|
|
|
|
// Start runs the specified function with an errgroup
|
|
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error) {
|
|
group.Go(func() error {
|
|
return cycle.Run(ctx, fn)
|
|
})
|
|
}
|
|
|
|
// Run runs the specified in an interval.
|
|
//
|
|
// Every interval `fn` is started.
|
|
// When `fn` is not fast enough, it may skip some of those executions.
|
|
func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error {
|
|
cycle.initialize()
|
|
defer close(cycle.stop)
|
|
|
|
currentInterval := cycle.interval
|
|
cycle.ticker = time.NewTicker(currentInterval)
|
|
if err := fn(ctx); err != nil {
|
|
return err
|
|
}
|
|
for {
|
|
select {
|
|
|
|
case message := <-cycle.control:
|
|
// handle control messages
|
|
|
|
switch message := message.(type) {
|
|
case cycleStop:
|
|
return nil
|
|
|
|
case cycleChangeInterval:
|
|
currentInterval = message.Interval
|
|
cycle.ticker.Stop()
|
|
cycle.ticker = time.NewTicker(currentInterval)
|
|
|
|
case cyclePause:
|
|
cycle.ticker.Stop()
|
|
// ensure we don't have ticks left
|
|
select {
|
|
case <-cycle.ticker.C:
|
|
default:
|
|
}
|
|
|
|
case cycleContinue:
|
|
cycle.ticker.Stop()
|
|
cycle.ticker = time.NewTicker(currentInterval)
|
|
|
|
case cycleTrigger:
|
|
// trigger the function
|
|
if err := fn(ctx); err != nil {
|
|
return err
|
|
}
|
|
if message.done != nil {
|
|
message.done <- struct{}{}
|
|
}
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
// handle control messages
|
|
return ctx.Err()
|
|
|
|
case <-cycle.ticker.C:
|
|
// trigger the function
|
|
if err := fn(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes all resources associated with it.
|
|
func (cycle *Cycle) Close() {
|
|
cycle.Stop()
|
|
<-cycle.stop
|
|
close(cycle.control)
|
|
}
|
|
|
|
// sendControl sends a control message
|
|
func (cycle *Cycle) sendControl(message interface{}) {
|
|
cycle.initialize()
|
|
select {
|
|
case cycle.control <- message:
|
|
case <-cycle.stop:
|
|
}
|
|
}
|
|
|
|
// Stop stops the cycle permanently
|
|
func (cycle *Cycle) Stop() {
|
|
cycle.sendControl(cycleStop{})
|
|
}
|
|
|
|
// ChangeInterval allows to change the ticker interval after it has started.
|
|
func (cycle *Cycle) ChangeInterval(interval time.Duration) {
|
|
cycle.sendControl(cycleChangeInterval{interval})
|
|
}
|
|
|
|
// Pause pauses the cycle.
|
|
func (cycle *Cycle) Pause() {
|
|
cycle.sendControl(cyclePause{})
|
|
}
|
|
|
|
// Restart restarts the ticker from 0.
|
|
func (cycle *Cycle) Restart() {
|
|
cycle.sendControl(cycleContinue{})
|
|
}
|
|
|
|
// Trigger ensures that the loop is done at least once.
|
|
// If it's currently running it waits for the previous to complete and then runs.
|
|
func (cycle *Cycle) Trigger() {
|
|
cycle.sendControl(cycleTrigger{})
|
|
}
|
|
|
|
// TriggerWait ensures that the loop is done at least once and waits for completion.
|
|
// If it's currently running it waits for the previous to complete and then runs.
|
|
func (cycle *Cycle) TriggerWait() {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
|
|
cycle.sendControl(cycleTrigger{done})
|
|
select {
|
|
case <-done:
|
|
case <-cycle.stop:
|
|
}
|
|
}
|