satellite/audit,storagenode/gracefulexit: fixes to limiter
Ensure we don't rely on limiter to wait multiple times. Change-Id: I75d48420236216d4c2fc6fa99293f51f80cd9c33
This commit is contained in:
parent
1cb3cbaecf
commit
bc9ab8ee5e
@ -33,12 +33,12 @@ type Config struct {
|
||||
|
||||
// Worker contains information for populating audit queue and processing audits.
|
||||
type Worker struct {
|
||||
log *zap.Logger
|
||||
queues *Queues
|
||||
verifier *Verifier
|
||||
reporter Reporter
|
||||
Loop *sync2.Cycle
|
||||
limiter *sync2.Limiter
|
||||
log *zap.Logger
|
||||
queues *Queues
|
||||
verifier *Verifier
|
||||
reporter Reporter
|
||||
Loop *sync2.Cycle
|
||||
concurrency int
|
||||
}
|
||||
|
||||
// NewWorker instantiates Worker.
|
||||
@ -46,11 +46,11 @@ func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Rep
|
||||
return &Worker{
|
||||
log: log,
|
||||
|
||||
queues: queues,
|
||||
verifier: verifier,
|
||||
reporter: reporter,
|
||||
Loop: sync2.NewCycle(config.QueueInterval),
|
||||
limiter: sync2.NewLimiter(config.WorkerConcurrency),
|
||||
queues: queues,
|
||||
verifier: verifier,
|
||||
reporter: reporter,
|
||||
Loop: sync2.NewCycle(config.QueueInterval),
|
||||
concurrency: config.WorkerConcurrency,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -58,9 +58,6 @@ func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Rep
|
||||
func (worker *Worker) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Wait for all audits to run.
|
||||
defer worker.limiter.Wait()
|
||||
|
||||
return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = worker.process(ctx)
|
||||
@ -84,7 +81,9 @@ func (worker *Worker) process(ctx context.Context) (err error) {
|
||||
// get the current queue
|
||||
queue := worker.queues.Fetch()
|
||||
|
||||
worker.limiter.Wait()
|
||||
limiter := sync2.NewLimiter(worker.concurrency)
|
||||
defer limiter.Wait()
|
||||
|
||||
for {
|
||||
segment, err := queue.Next()
|
||||
if err != nil {
|
||||
@ -99,7 +98,7 @@ func (worker *Worker) process(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
worker.limiter.Go(ctx, func() {
|
||||
started := limiter.Go(ctx, func() {
|
||||
err := worker.work(ctx, segment)
|
||||
if err != nil {
|
||||
worker.log.Error("error(s) during audit",
|
||||
@ -108,6 +107,9 @@ func (worker *Worker) process(ctx context.Context) (err error) {
|
||||
zap.Error(err))
|
||||
}
|
||||
})
|
||||
if !started {
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ package gracefulexit
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -46,64 +47,80 @@ func NewChore(log *zap.Logger, service Service, transferService piecetransfer.Se
|
||||
// Run starts the chore.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
geSatellites, err := chore.service.ListPendingExits(ctx)
|
||||
if err != nil {
|
||||
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(geSatellites) == 0 {
|
||||
return nil
|
||||
}
|
||||
chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites)))
|
||||
|
||||
for _, satellite := range geSatellites {
|
||||
mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked
|
||||
satellite := satellite
|
||||
|
||||
worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config)
|
||||
if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok {
|
||||
// already running a worker for this satellite
|
||||
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||
continue
|
||||
}
|
||||
|
||||
chore.limiter.Go(ctx, func() {
|
||||
err := worker.Run(ctx, func() {
|
||||
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||
chore.exitingMap.Delete(satellite.SatelliteID)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
chore.log.Error("worker failed", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := worker.Close(); err != nil {
|
||||
chore.log.Error("closing worker failed", zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
chore.limiter.Wait()
|
||||
|
||||
return err
|
||||
defer chore.limiter.Wait()
|
||||
return chore.Loop.Run(ctx, chore.AddMissing)
|
||||
}
|
||||
|
||||
// TestWaitForWorkers waits for any pending worker to finish.
|
||||
func (chore *Chore) TestWaitForWorkers() {
|
||||
chore.limiter.Wait()
|
||||
// AddMissing starts any missing satellite chore.
|
||||
func (chore *Chore) AddMissing(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
geSatellites, err := chore.service.ListPendingExits(ctx)
|
||||
if err != nil {
|
||||
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(geSatellites) == 0 {
|
||||
return nil
|
||||
}
|
||||
chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites)))
|
||||
|
||||
for _, satellite := range geSatellites {
|
||||
mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked
|
||||
satellite := satellite
|
||||
|
||||
worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config)
|
||||
if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok {
|
||||
// already running a worker for this satellite
|
||||
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||
continue
|
||||
}
|
||||
|
||||
started := chore.limiter.Go(ctx, func() {
|
||||
err := worker.Run(ctx, func() {
|
||||
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||
chore.exitingMap.Delete(satellite.SatelliteID)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
chore.log.Error("worker failed", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := worker.Close(); err != nil {
|
||||
chore.log.Error("closing worker failed", zap.Error(err))
|
||||
}
|
||||
})
|
||||
if !started {
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestWaitForNoWorkers waits for any pending worker to finish.
|
||||
func (chore *Chore) TestWaitForNoWorkers(ctx context.Context) error {
|
||||
for {
|
||||
if !sync2.Sleep(ctx, 100*time.Millisecond) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
found := false
|
||||
chore.exitingMap.Range(func(key, value interface{}) bool {
|
||||
found = true
|
||||
return false
|
||||
})
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes chore.
|
||||
func (chore *Chore) Close() error {
|
||||
chore.Loop.Close()
|
||||
|
||||
chore.exitingMap.Range(func(key interface{}, value interface{}) bool {
|
||||
worker := value.(*Worker)
|
||||
err := worker.Close()
|
||||
|
@ -107,7 +107,8 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
// run the SN chore again to start processing transfers.
|
||||
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
|
||||
// wait for workers to finish
|
||||
exitingNode.GracefulExit.Chore.TestWaitForWorkers()
|
||||
err = exitingNode.GracefulExit.Chore.TestWaitForNoWorkers(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that there are no more items to process
|
||||
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
|
||||
|
Loading…
Reference in New Issue
Block a user