From bc9ab8ee5e50920a23485c8eb7a6a591c49dda50 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Mon, 1 Aug 2022 15:00:23 +0300 Subject: [PATCH] satellite/audit,storagenode/gracefulexit: fixes to limiter Ensure we don't rely on limiter to wait multiple times. Change-Id: I75d48420236216d4c2fc6fa99293f51f80cd9c33 --- satellite/audit/worker.go | 34 +++---- storagenode/gracefulexit/chore.go | 119 ++++++++++++++----------- storagenode/gracefulexit/chore_test.go | 3 +- 3 files changed, 88 insertions(+), 68 deletions(-) diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index a4d2aaf5a..361de0b00 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -33,12 +33,12 @@ type Config struct { // Worker contains information for populating audit queue and processing audits. type Worker struct { - log *zap.Logger - queues *Queues - verifier *Verifier - reporter Reporter - Loop *sync2.Cycle - limiter *sync2.Limiter + log *zap.Logger + queues *Queues + verifier *Verifier + reporter Reporter + Loop *sync2.Cycle + concurrency int } // NewWorker instantiates Worker. @@ -46,11 +46,11 @@ func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Rep return &Worker{ log: log, - queues: queues, - verifier: verifier, - reporter: reporter, - Loop: sync2.NewCycle(config.QueueInterval), - limiter: sync2.NewLimiter(config.WorkerConcurrency), + queues: queues, + verifier: verifier, + reporter: reporter, + Loop: sync2.NewCycle(config.QueueInterval), + concurrency: config.WorkerConcurrency, }, nil } @@ -58,9 +58,6 @@ func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Rep func (worker *Worker) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - // Wait for all audits to run. - defer worker.limiter.Wait() - return worker.Loop.Run(ctx, func(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) err = worker.process(ctx) @@ -84,7 +81,9 @@ func (worker *Worker) process(ctx context.Context) (err error) { // get the current queue queue := worker.queues.Fetch() - worker.limiter.Wait() + limiter := sync2.NewLimiter(worker.concurrency) + defer limiter.Wait() + for { segment, err := queue.Next() if err != nil { @@ -99,7 +98,7 @@ func (worker *Worker) process(ctx context.Context) (err error) { return err } - worker.limiter.Go(ctx, func() { + started := limiter.Go(ctx, func() { err := worker.work(ctx, segment) if err != nil { worker.log.Error("error(s) during audit", @@ -108,6 +107,9 @@ func (worker *Worker) process(ctx context.Context) (err error) { zap.Error(err)) } }) + if !started { + return ctx.Err() + } } } diff --git a/storagenode/gracefulexit/chore.go b/storagenode/gracefulexit/chore.go index ea57ff3b3..55a273797 100644 --- a/storagenode/gracefulexit/chore.go +++ b/storagenode/gracefulexit/chore.go @@ -6,6 +6,7 @@ package gracefulexit import ( "context" "sync" + "time" "go.uber.org/zap" @@ -46,64 +47,80 @@ func NewChore(log *zap.Logger, service Service, transferService piecetransfer.Se // Run starts the chore. func (chore *Chore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - - err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - geSatellites, err := chore.service.ListPendingExits(ctx) - if err != nil { - chore.log.Error("error retrieving satellites.", zap.Error(err)) - return nil - } - - if len(geSatellites) == 0 { - return nil - } - chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites))) - - for _, satellite := range geSatellites { - mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked - satellite := satellite - - worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config) - if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok { - // already running a worker for this satellite - chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID)) - continue - } - - chore.limiter.Go(ctx, func() { - err := worker.Run(ctx, func() { - chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID)) - chore.exitingMap.Delete(satellite.SatelliteID) - }) - - if err != nil { - chore.log.Error("worker failed", zap.Error(err)) - } - - if err := worker.Close(); err != nil { - chore.log.Error("closing worker failed", zap.Error(err)) - } - }) - } - - return nil - }) - - chore.limiter.Wait() - - return err + defer chore.limiter.Wait() + return chore.Loop.Run(ctx, chore.AddMissing) } -// TestWaitForWorkers waits for any pending worker to finish. -func (chore *Chore) TestWaitForWorkers() { - chore.limiter.Wait() +// AddMissing starts any missing satellite chore. +func (chore *Chore) AddMissing(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + geSatellites, err := chore.service.ListPendingExits(ctx) + if err != nil { + chore.log.Error("error retrieving satellites.", zap.Error(err)) + return nil + } + + if len(geSatellites) == 0 { + return nil + } + chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites))) + + for _, satellite := range geSatellites { + mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked + satellite := satellite + + worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config) + if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok { + // already running a worker for this satellite + chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID)) + continue + } + + started := chore.limiter.Go(ctx, func() { + err := worker.Run(ctx, func() { + chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID)) + chore.exitingMap.Delete(satellite.SatelliteID) + }) + + if err != nil { + chore.log.Error("worker failed", zap.Error(err)) + } + + if err := worker.Close(); err != nil { + chore.log.Error("closing worker failed", zap.Error(err)) + } + }) + if !started { + return ctx.Err() + } + } + + return nil +} + +// TestWaitForNoWorkers waits for any pending worker to finish. +func (chore *Chore) TestWaitForNoWorkers(ctx context.Context) error { + for { + if !sync2.Sleep(ctx, 100*time.Millisecond) { + return ctx.Err() + } + + found := false + chore.exitingMap.Range(func(key, value interface{}) bool { + found = true + return false + }) + if !found { + return nil + } + } } // Close closes chore. func (chore *Chore) Close() error { chore.Loop.Close() + chore.exitingMap.Range(func(key interface{}, value interface{}) bool { worker := value.(*Worker) err := worker.Close() diff --git a/storagenode/gracefulexit/chore_test.go b/storagenode/gracefulexit/chore_test.go index dd1f2d921..ce72f3d94 100644 --- a/storagenode/gracefulexit/chore_test.go +++ b/storagenode/gracefulexit/chore_test.go @@ -107,7 +107,8 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, // run the SN chore again to start processing transfers. exitingNode.GracefulExit.Chore.Loop.TriggerWait() // wait for workers to finish - exitingNode.GracefulExit.Chore.TestWaitForWorkers() + err = exitingNode.GracefulExit.Chore.TestWaitForNoWorkers(ctx) + require.NoError(t, err) // check that there are no more items to process queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)