From c575a215092bf6ad22a7fb5496356735d21d508f Mon Sep 17 00:00:00 2001 From: paul cannon Date: Tue, 22 Nov 2022 16:20:44 -0600 Subject: [PATCH] satellite/audit: add audit.ReverifyWorker Here we add a worker class comparable to audit.Worker, which will be responsible for pulling items off of the reverification queue and calling reverifier.ReverifyPiece on them. Note that piecewise reverification audits (which this will control) are not yet being done. That is, nothing is being added to the reverification queue at this point. Refs: https://github.com/storj/storj/issues/5251 Change-Id: I94e28830e27caa49f2c8bd4a2336533e187ab69c --- private/testplanet/satellite.go | 16 +++-- satellite/audit/reverifyworker.go | 114 ++++++++++++++++++++++++++++++ satellite/audit/worker.go | 26 +++---- satellite/core.go | 32 ++++++--- 4 files changed, 160 insertions(+), 28 deletions(-) create mode 100644 satellite/audit/reverifyworker.go diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index dd54bb1a8..bb40612ca 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -133,12 +133,14 @@ type Satellite struct { } Audit struct { - VerifyQueue audit.VerifyQueue - Worker *audit.Worker - Chore *audit.Chore - Verifier *audit.Verifier - Reverifier *audit.Reverifier - Reporter audit.Reporter + VerifyQueue audit.VerifyQueue + ReverifyQueue audit.ReverifyQueue + Worker *audit.Worker + ReverifyWorker *audit.ReverifyWorker + Chore *audit.Chore + Verifier *audit.Verifier + Reverifier *audit.Reverifier + Reporter audit.Reporter } Reputation struct { @@ -610,7 +612,9 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Repair.Repairer = repairerPeer.Repairer system.Audit.VerifyQueue = peer.Audit.VerifyQueue + system.Audit.ReverifyQueue = peer.Audit.ReverifyQueue system.Audit.Worker = peer.Audit.Worker + system.Audit.ReverifyWorker = peer.Audit.ReverifyWorker system.Audit.Chore = peer.Audit.Chore system.Audit.Verifier = peer.Audit.Verifier system.Audit.Reverifier = peer.Audit.Reverifier diff --git a/satellite/audit/reverifyworker.go b/satellite/audit/reverifyworker.go new file mode 100644 index 000000000..614a358ec --- /dev/null +++ b/satellite/audit/reverifyworker.go @@ -0,0 +1,114 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package audit + +import ( + "context" + "time" + + "go.uber.org/zap" + + "storj.io/common/sync2" +) + +// ReverifyWorker processes reverifications (retrying piece audits against nodes that timed out +// during a Verification). +type ReverifyWorker struct { + log *zap.Logger + queue ReverifyQueue + reverifier *Reverifier + reporter Reporter + + Loop *sync2.Cycle + concurrency int + retryInterval time.Duration +} + +// NewReverifyWorker creates a new ReverifyWorker. +func NewReverifyWorker(log *zap.Logger, queue ReverifyQueue, reverifier *Reverifier, reporter Reporter, config Config) *ReverifyWorker { + return &ReverifyWorker{ + log: log, + queue: queue, + reverifier: reverifier, + reporter: reporter, + Loop: sync2.NewCycle(config.QueueInterval), + concurrency: config.ReverifyWorkerConcurrency, + retryInterval: config.ReverificationRetryInterval, + } +} + +// Run runs a ReverifyWorker. +func (worker *ReverifyWorker) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + return worker.Loop.Run(ctx, func(ctx context.Context) (err error) { + err = worker.process(ctx) + if err != nil { + worker.log.Error("failure processing reverify queue", zap.Error(Error.Wrap(err))) + } + return nil + }) +} + +func (worker *ReverifyWorker) process(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + limiter := sync2.NewLimiter(worker.concurrency) + defer limiter.Wait() + + for { + // We start the timeout clock _before_ pulling the next job from + // the queue. This gives us the best chance of having this worker + // terminate and get cleaned up before another reverification + // worker tries to take over the job. + // + // (If another worker does take over the job before this worker + // has been cleaned up, it is ok; the downside should only be + // duplication of work and monkit stats.) + ctx, cancel := context.WithTimeout(ctx, worker.retryInterval) + + reverifyJob, err := worker.queue.GetNextJob(ctx, worker.retryInterval) + if err != nil { + cancel() + if ErrEmptyQueue.Has(err) { + return nil + } + return err + } + + started := limiter.Go(ctx, func() { + defer cancel() + + logger := worker.log.With( + zap.Stringer("Segment StreamID", reverifyJob.Locator.StreamID), + zap.Uint64("Segment Position", reverifyJob.Locator.Position.Encode()), + zap.Stringer("Node ID", reverifyJob.Locator.NodeID), + zap.Int("Piece Number", reverifyJob.Locator.PieceNum)) + worker.work(ctx, logger, reverifyJob) + }) + if !started { + cancel() + return ctx.Err() + } + } +} + +func (worker *ReverifyWorker) work(ctx context.Context, logger *zap.Logger, job *ReverificationJob) { + defer mon.Task()(&ctx)(nil) + + logger.Debug("beginning piecewise audit") + outcome, reputation := worker.reverifier.ReverifyPiece(ctx, logger, &job.Locator) + logger.Debug("piecewise audit complete", zap.Int("outcome", int(outcome))) + + err := worker.reporter.RecordReverificationResult(ctx, job, outcome, reputation) + if err != nil { + logger.Error("finished with audit, but failed to remove entry from queue", zap.Error(err)) + } +} + +// Close halts the worker. +func (worker *ReverifyWorker) Close() error { + worker.Loop.Close() + return nil +} diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index d83c1a7aa..5f7a41b59 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -38,24 +38,26 @@ type Config struct { // Worker contains information for populating audit queue and processing audits. type Worker struct { - log *zap.Logger - queue VerifyQueue - verifier *Verifier - reporter Reporter - Loop *sync2.Cycle - concurrency int + log *zap.Logger + queue VerifyQueue + verifier *Verifier + reverifyQueue ReverifyQueue + reporter Reporter + Loop *sync2.Cycle + concurrency int } // NewWorker instantiates Worker. -func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reporter Reporter, config Config) *Worker { +func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reverifyQueue ReverifyQueue, reporter Reporter, config Config) *Worker { return &Worker{ log: log, - queue: queue, - verifier: verifier, - reporter: reporter, - Loop: sync2.NewCycle(config.QueueInterval), - concurrency: config.WorkerConcurrency, + queue: queue, + verifier: verifier, + reverifyQueue: reverifyQueue, + reporter: reporter, + Loop: sync2.NewCycle(config.QueueInterval), + concurrency: config.WorkerConcurrency, } } diff --git a/satellite/core.go b/satellite/core.go index 4cdc0a067..8bc15188e 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -116,13 +116,14 @@ type Core struct { } Audit struct { - VerifyQueue audit.VerifyQueue - ReverifyQueue audit.ReverifyQueue - Worker *audit.Worker - Chore *audit.Chore - Verifier *audit.Verifier - Reverifier *audit.Reverifier - Reporter audit.Reporter + VerifyQueue audit.VerifyQueue + ReverifyQueue audit.ReverifyQueue + Worker *audit.Worker + ReverifyWorker *audit.ReverifyWorker + Chore *audit.Chore + Verifier *audit.Verifier + Reverifier *audit.Reverifier + Reporter audit.Reporter } ExpiredDeletion struct { @@ -408,6 +409,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config := config.Audit peer.Audit.VerifyQueue = db.VerifyQueue() + peer.Audit.ReverifyQueue = db.ReverifyQueue() peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"), peer.Metainfo.Metabase, @@ -437,6 +439,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Audit.Worker = audit.NewWorker(peer.Log.Named("audit:worker"), peer.Audit.VerifyQueue, peer.Audit.Verifier, + peer.Audit.ReverifyQueue, peer.Audit.Reporter, config, ) @@ -448,9 +451,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Debug.Server.Panel.Add( debug.Cycle("Audit Worker", peer.Audit.Worker.Loop)) - if err != nil { - return nil, errs.Combine(err, peer.Close()) - } + peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"), + peer.Audit.ReverifyQueue, + peer.Audit.Reverifier, + peer.Audit.Reporter, + config) + peer.Services.Add(lifecycle.Item{ + Name: "audit:reverify-worker", + Run: peer.Audit.ReverifyWorker.Run, + Close: peer.Audit.ReverifyWorker.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop)) peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"), peer.Audit.VerifyQueue,