satellite/metabase/metaloop: add Monitor
We need some chores to join without triggering the loop. For example it's fine to run metrics, only when something else is running. Change-Id: I9d8bd16f59c28c540c8d72971bc4e233a8660c02
This commit is contained in:
parent
fff21b330d
commit
c1fbecb96b
@ -47,24 +47,22 @@ func (chore *Chore) RunOnce(ctx context.Context) error {
|
|||||||
loop := metaloop.New(chore.Config.Loop, chore.DB)
|
loop := metaloop.New(chore.Config.Loop, chore.DB)
|
||||||
|
|
||||||
var group errs2.Group
|
var group errs2.Group
|
||||||
|
group.Go(func() error {
|
||||||
|
plainOffset := &SegmentSizes{
|
||||||
|
Log: chore.Log.Named("segment-sizes"),
|
||||||
|
}
|
||||||
|
err := loop.Join(ctx, plainOffset)
|
||||||
|
return Error.Wrap(err)
|
||||||
|
})
|
||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
progress := &ProgressObserver{
|
progress := &ProgressObserver{
|
||||||
Log: chore.Log.Named("progress"),
|
Log: chore.Log.Named("progress"),
|
||||||
ProgressPrintFrequency: chore.Config.ProgressPrintFrequency,
|
ProgressPrintFrequency: chore.Config.ProgressPrintFrequency,
|
||||||
}
|
}
|
||||||
|
err := loop.Monitor(ctx, progress)
|
||||||
plainOffset := &SegmentSizes{
|
|
||||||
Log: chore.Log.Named("segment-sizes"),
|
|
||||||
}
|
|
||||||
|
|
||||||
err := loop.Join(ctx, progress, plainOffset)
|
|
||||||
if err != nil {
|
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.Report()
|
progress.Report()
|
||||||
|
return Error.Wrap(err)
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
return Error.Wrap(loop.RunOnce(ctx))
|
return Error.Wrap(loop.RunOnce(ctx))
|
||||||
|
@ -89,6 +89,7 @@ func (NullObserver) LoopStarted(context.Context, LoopInfo) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type observerContext struct {
|
type observerContext struct {
|
||||||
|
trigger bool
|
||||||
observer Observer
|
observer Observer
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -178,7 +179,7 @@ type MetabaseDB interface {
|
|||||||
type Service struct {
|
type Service struct {
|
||||||
config Config
|
config Config
|
||||||
metabaseDB MetabaseDB
|
metabaseDB MetabaseDB
|
||||||
join chan []*observerContext
|
join chan *observerContext
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,37 +188,48 @@ func New(config Config, metabaseDB MetabaseDB) *Service {
|
|||||||
return &Service{
|
return &Service{
|
||||||
metabaseDB: metabaseDB,
|
metabaseDB: metabaseDB,
|
||||||
config: config,
|
config: config,
|
||||||
join: make(chan []*observerContext),
|
join: make(chan *observerContext),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Join will join the looper for one full cycle until completion and then returns.
|
// Join will join the looper for one full cycle until completion and then returns.
|
||||||
|
// Joining will trigger a new iteration after coalesce duration.
|
||||||
// On ctx cancel the observer will return without completely finishing.
|
// On ctx cancel the observer will return without completely finishing.
|
||||||
// Only on full complete iteration it will return nil.
|
// Only on full complete iteration it will return nil.
|
||||||
// Safe to be called concurrently.
|
// Safe to be called concurrently.
|
||||||
func (loop *Service) Join(ctx context.Context, observers ...Observer) (err error) {
|
func (loop *Service) Join(ctx context.Context, observer Observer) (err error) {
|
||||||
|
return loop.joinObserver(ctx, true, observer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor will join the looper for one full cycle until completion and then returns.
|
||||||
|
// Joining with monitoring won't trigger after coalesce duration.
|
||||||
|
// On ctx cancel the observer will return without completely finishing.
|
||||||
|
// Only on full complete iteration it will return nil.
|
||||||
|
// Safe to be called concurrently.
|
||||||
|
func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error) {
|
||||||
|
return loop.joinObserver(ctx, false, observer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// joinObserver will join the looper for one full cycle until completion and then returns.
|
||||||
|
// On ctx cancel the observer will return without completely finishing.
|
||||||
|
// Only on full complete iteration it will return nil.
|
||||||
|
// Safe to be called concurrently.
|
||||||
|
func (loop *Service) joinObserver(ctx context.Context, trigger bool, obs Observer) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
obsContexts := make([]*observerContext, len(observers))
|
obsctx := newObserverContext(ctx, obs)
|
||||||
for i, obs := range observers {
|
obsctx.trigger = trigger
|
||||||
obsContexts[i] = newObserverContext(ctx, obs)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case loop.join <- obsContexts:
|
case loop.join <- obsctx:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-loop.done:
|
case <-loop.done:
|
||||||
return ErrClosed
|
return ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
var errList errs.Group
|
return obsctx.Wait()
|
||||||
for _, ctx := range obsContexts {
|
|
||||||
errList.Add(ctx.Wait())
|
|
||||||
}
|
|
||||||
|
|
||||||
return errList.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the looping service.
|
// Run starts the looping service.
|
||||||
@ -248,30 +260,80 @@ var monMetainfo = monkit.ScopeNamed("storj.io/storj/satellite/metainfo/metaloop"
|
|||||||
func (loop *Service) RunOnce(ctx context.Context) (err error) {
|
func (loop *Service) RunOnce(ctx context.Context) (err error) {
|
||||||
defer monMetainfo.Task()(&ctx)(&err) //mon:locked
|
defer monMetainfo.Task()(&ctx)(&err) //mon:locked
|
||||||
|
|
||||||
var observers []*observerContext
|
coalesceTimer := time.NewTimer(loop.config.CoalesceDuration)
|
||||||
|
defer coalesceTimer.Stop()
|
||||||
// wait for the first observer, or exit because context is canceled
|
if !coalesceTimer.Stop() {
|
||||||
select {
|
<-coalesceTimer.C
|
||||||
case list := <-loop.join:
|
|
||||||
observers = append(observers, list...)
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// after the first observer is found, set timer for CoalesceDuration and add any observers that try to join before the timer is up
|
earlyExit := make(chan *observerContext)
|
||||||
timer := time.NewTimer(loop.config.CoalesceDuration)
|
earlyExitDone := make(chan struct{})
|
||||||
|
monitorEarlyExit := func(obs *observerContext) {
|
||||||
|
select {
|
||||||
|
case <-obs.ctx.Done():
|
||||||
|
select {
|
||||||
|
case <-earlyExitDone:
|
||||||
|
case earlyExit <- obs:
|
||||||
|
}
|
||||||
|
case <-earlyExitDone:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timerStarted := false
|
||||||
|
observers := []*observerContext{}
|
||||||
|
|
||||||
waitformore:
|
waitformore:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case list := <-loop.join:
|
// when the coalesce timer hits, we have waited enough for observers to join.
|
||||||
observers = append(observers, list...)
|
case <-coalesceTimer.C:
|
||||||
case <-timer.C:
|
|
||||||
break waitformore
|
break waitformore
|
||||||
|
|
||||||
|
// wait for a new observer to join.
|
||||||
|
case obsctx := <-loop.join:
|
||||||
|
// when the observer triggers the loop and it's the first one,
|
||||||
|
// then start the coalescing timer.
|
||||||
|
if obsctx.trigger {
|
||||||
|
if !timerStarted {
|
||||||
|
coalesceTimer.Reset(loop.config.CoalesceDuration)
|
||||||
|
timerStarted = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
observers = append(observers, obsctx)
|
||||||
|
go monitorEarlyExit(obsctx)
|
||||||
|
|
||||||
|
// remove an observer from waiting when it's canceled before the loop starts.
|
||||||
|
case obsctx := <-earlyExit:
|
||||||
|
for i, obs := range observers {
|
||||||
|
if obs == obsctx {
|
||||||
|
observers = append(observers[:i], observers[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
obsctx.HandleError(obsctx.ctx.Err())
|
||||||
|
|
||||||
|
// reevalute, whether we acually need to start the loop.
|
||||||
|
timerShouldRun := false
|
||||||
|
for _, obs := range observers {
|
||||||
|
timerShouldRun = timerShouldRun || obs.trigger
|
||||||
|
}
|
||||||
|
|
||||||
|
if !timerShouldRun && timerStarted {
|
||||||
|
if !coalesceTimer.Stop() {
|
||||||
|
<-coalesceTimer.C
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// when ctx done happens we can finish all the waiting observers.
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
finishObservers(observers)
|
close(earlyExitDone)
|
||||||
|
errorObservers(observers, ctx.Err())
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
close(earlyExitDone)
|
||||||
|
|
||||||
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
|
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,9 +346,7 @@ func (loop *Service) Wait() {
|
|||||||
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
|
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for _, observer := range observers {
|
errorObservers(observers, err)
|
||||||
observer.HandleError(err)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
finishObservers(observers)
|
finishObservers(observers)
|
||||||
@ -511,3 +571,9 @@ func finishObservers(observers []*observerContext) {
|
|||||||
observer.Finish()
|
observer.Finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func errorObservers(observers []*observerContext, err error) {
|
||||||
|
for _, observer := range observers {
|
||||||
|
observer.HandleError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -234,7 +234,7 @@ func TestLoopObserverCancel(t *testing.T) {
|
|||||||
|
|
||||||
// create 1 "good" observer
|
// create 1 "good" observer
|
||||||
obs1 := newTestObserver(nil)
|
obs1 := newTestObserver(nil)
|
||||||
obs1x := newTestObserver(nil)
|
mon1 := newTestObserver(nil)
|
||||||
|
|
||||||
// create observer that will return an error from RemoteSegment
|
// create observer that will return an error from RemoteSegment
|
||||||
obs2 := newTestObserver(func(ctx context.Context) error {
|
obs2 := newTestObserver(func(ctx context.Context) error {
|
||||||
@ -256,7 +256,10 @@ func TestLoopObserverCancel(t *testing.T) {
|
|||||||
|
|
||||||
var group errgroup.Group
|
var group errgroup.Group
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
return metaLoop.Join(ctx, obs1, obs1x)
|
return metaLoop.Join(ctx, obs1)
|
||||||
|
})
|
||||||
|
group.Go(func() error {
|
||||||
|
return metaLoop.Monitor(ctx, mon1)
|
||||||
})
|
})
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
err := metaLoop.Join(ctx, obs2)
|
err := metaLoop.Join(ctx, obs2)
|
||||||
@ -281,7 +284,7 @@ func TestLoopObserverCancel(t *testing.T) {
|
|||||||
|
|
||||||
// expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one
|
// expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one
|
||||||
assert.EqualValues(t, 3, obs1.remoteSegCount)
|
assert.EqualValues(t, 3, obs1.remoteSegCount)
|
||||||
assert.EqualValues(t, 3, obs1x.remoteSegCount)
|
assert.EqualValues(t, 3, mon1.remoteSegCount)
|
||||||
assert.EqualValues(t, 1, obs2.remoteSegCount)
|
assert.EqualValues(t, 1, obs2.remoteSegCount)
|
||||||
assert.EqualValues(t, 1, obs3.remoteSegCount)
|
assert.EqualValues(t, 1, obs3.remoteSegCount)
|
||||||
})
|
})
|
||||||
@ -379,6 +382,54 @@ func TestLoopCancel(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoop_MonitorCancel(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
satellite := planet.Satellites[0]
|
||||||
|
|
||||||
|
metaLoop := metaloop.New(metaloop.Config{
|
||||||
|
CoalesceDuration: time.Nanosecond,
|
||||||
|
ListLimit: 10000,
|
||||||
|
}, satellite.Metainfo.Metabase)
|
||||||
|
|
||||||
|
obs1 := newTestObserver(func(ctx context.Context) error {
|
||||||
|
return errors.New("test error")
|
||||||
|
})
|
||||||
|
|
||||||
|
var group errgroup.Group
|
||||||
|
|
||||||
|
loopCtx, loopCancel := context.WithCancel(ctx)
|
||||||
|
group.Go(func() error {
|
||||||
|
err := metaLoop.Run(loopCtx)
|
||||||
|
t.Log("metaloop stopped")
|
||||||
|
if !errs2.IsCanceled(err) {
|
||||||
|
return errors.New("expected context canceled")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
obsCtx, obsCancel := context.WithCancel(ctx)
|
||||||
|
group.Go(func() error {
|
||||||
|
defer loopCancel()
|
||||||
|
err := metaLoop.Monitor(obsCtx, obs1)
|
||||||
|
t.Log("observer stopped")
|
||||||
|
if !errs2.IsCanceled(err) {
|
||||||
|
return errors.New("expected context canceled")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
obsCancel()
|
||||||
|
|
||||||
|
err := group.Wait()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = metaLoop.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type testObserver struct {
|
type testObserver struct {
|
||||||
objectCount int
|
objectCount int
|
||||||
remoteSegCount int
|
remoteSegCount int
|
||||||
|
Loading…
Reference in New Issue
Block a user