2019-09-05 16:40:52 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package audit
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-09-11 23:37:01 +01:00
|
|
|
"time"
|
2019-09-05 16:40:52 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
"github.com/zeebo/errs"
|
2019-09-05 16:40:52 +01:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
"storj.io/storj/internal/memory"
|
2019-09-05 16:40:52 +01:00
|
|
|
"storj.io/storj/internal/sync2"
|
|
|
|
"storj.io/storj/pkg/storj"
|
|
|
|
)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
// Error is the default audit errs class.
|
|
|
|
var Error = errs.Class("audit error")
|
|
|
|
|
|
|
|
// Config contains configurable values for audit chore and workers.
|
|
|
|
type Config struct {
|
|
|
|
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`
|
|
|
|
MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B"`
|
|
|
|
MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"25s"`
|
|
|
|
MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"`
|
|
|
|
|
2019-09-16 21:36:33 +01:00
|
|
|
ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m"`
|
|
|
|
QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m"`
|
2019-09-11 23:37:01 +01:00
|
|
|
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
|
|
|
|
WorkerConcurrency int `help:"number of workers to run audits on paths" default:"1"`
|
|
|
|
}
|
|
|
|
|
2019-09-05 16:40:52 +01:00
|
|
|
// Worker contains information for populating audit queue and processing audits.
|
|
|
|
type Worker struct {
|
2019-09-11 23:37:01 +01:00
|
|
|
log *zap.Logger
|
|
|
|
queue *Queue
|
|
|
|
verifier *Verifier
|
|
|
|
reporter *Reporter
|
|
|
|
Loop sync2.Cycle
|
|
|
|
limiter sync2.Limiter
|
2019-09-05 16:40:52 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewWorker instantiates Worker.
|
2019-09-11 23:37:01 +01:00
|
|
|
func NewWorker(log *zap.Logger, queue *Queue, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
|
2019-09-05 16:40:52 +01:00
|
|
|
return &Worker{
|
|
|
|
log: log,
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
queue: queue,
|
|
|
|
verifier: verifier,
|
|
|
|
reporter: reporter,
|
|
|
|
Loop: *sync2.NewCycle(config.QueueInterval),
|
|
|
|
limiter: *sync2.NewLimiter(config.WorkerConcurrency),
|
2019-09-05 16:40:52 +01:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run runs audit service 2.0.
|
|
|
|
func (worker *Worker) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
worker.log.Debug("starting")
|
|
|
|
|
|
|
|
// Wait for all audits to run.
|
|
|
|
defer worker.limiter.Wait()
|
|
|
|
|
|
|
|
return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
err = worker.process(ctx)
|
|
|
|
if err != nil {
|
|
|
|
worker.log.Error("process", zap.Error(Error.Wrap(err)))
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close halts the worker.
|
|
|
|
func (worker *Worker) Close() error {
|
2019-09-11 23:37:01 +01:00
|
|
|
worker.Loop.Close()
|
2019-09-05 16:40:52 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// process repeatedly removes an item from the queue and runs an audit.
|
|
|
|
func (worker *Worker) process(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
worker.limiter.Wait()
|
|
|
|
for {
|
|
|
|
path, err := worker.queue.Next()
|
|
|
|
if err != nil {
|
|
|
|
if ErrEmptyQueue.Has(err) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
worker.limiter.Go(ctx, func() {
|
|
|
|
err := worker.work(ctx, path)
|
|
|
|
if err != nil {
|
2019-10-16 12:48:05 +01:00
|
|
|
worker.log.Error("audit failed", zap.Binary("Segment", []byte(path)), zap.Error(err))
|
2019-09-05 16:40:52 +01:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (worker *Worker) work(ctx context.Context, path storj.Path) error {
|
2019-09-11 23:37:01 +01:00
|
|
|
var errlist errs.Group
|
|
|
|
|
|
|
|
// First, attempt to reverify nodes for this segment that are in containment mode.
|
|
|
|
report, err := worker.verifier.Reverify(ctx, path)
|
|
|
|
if err != nil {
|
|
|
|
errlist.Add(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
|
2019-10-16 12:48:05 +01:00
|
|
|
_, err = worker.reporter.RecordAudits(ctx, report, path)
|
2019-09-11 23:37:01 +01:00
|
|
|
if err != nil {
|
|
|
|
errlist.Add(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Skip all reverified nodes in the next Verify step.
|
|
|
|
skip := make(map[storj.NodeID]bool)
|
2019-10-09 15:06:58 +01:00
|
|
|
for _, nodeID := range report.Successes {
|
|
|
|
skip[nodeID] = true
|
|
|
|
}
|
|
|
|
for _, nodeID := range report.Offlines {
|
|
|
|
skip[nodeID] = true
|
|
|
|
}
|
|
|
|
for _, nodeID := range report.Fails {
|
|
|
|
skip[nodeID] = true
|
|
|
|
}
|
|
|
|
for _, pending := range report.PendingAudits {
|
|
|
|
skip[pending.NodeID] = true
|
2019-09-11 23:37:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Next, audit the the remaining nodes that are not in containment mode.
|
|
|
|
report, err = worker.verifier.Verify(ctx, path, skip)
|
|
|
|
if err != nil {
|
|
|
|
errlist.Add(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
|
2019-10-16 12:48:05 +01:00
|
|
|
_, err = worker.reporter.RecordAudits(ctx, report, path)
|
2019-09-11 23:37:01 +01:00
|
|
|
if err != nil {
|
|
|
|
errlist.Add(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return errlist.Err()
|
2019-09-05 16:40:52 +01:00
|
|
|
}
|