2019-09-05 16:40:52 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
2019-09-11 23:37:01 +01:00
"time"
2019-09-05 16:40:52 +01:00
2019-09-11 23:37:01 +01:00
"github.com/zeebo/errs"
2019-09-05 16:40:52 +01:00
"go.uber.org/zap"
2019-12-27 11:48:47 +00:00
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/sync2"
2022-11-11 23:11:40 +00:00
"storj.io/storj/satellite/metabase"
2019-09-05 16:40:52 +01:00
)
2019-09-11 23:37:01 +01:00
// Error is the default audit errs class.
2021-04-28 09:06:17 +01:00
var Error = errs . Class ( "audit" )
2019-09-11 23:37:01 +01:00
// Config contains configurable values for audit chore and workers.
type Config struct {
MaxRetriesStatDB int ` help:"max number of times to attempt updating a statdb batch" default:"3" `
testplanet/satellite: reduce the number of places default values need to be configured
Satellites set their configuration values to default values using
cfgstruct, however, it turns out our tests don't test these values
at all! Instead, they have a completely separate definition system
that is easy to forget about.
As is to be expected, these values have drifted, and it appears
in a few cases test planet is testing unreasonable values that we
won't see in production, or perhaps worse, features enabled in
production were missed and weren't enabled in testplanet.
This change makes it so all values are configured the same,
systematic way, so it's easy to see when test values are different
than dev values or release values, and it's less hard to forget
to enable features in testplanet.
In terms of reviewing, this change should be actually fairly
easy to review, considering private/testplanet/satellite.go keeps
the current config system and the new one and confirms that they
result in identical configurations, so you can be certain that
nothing was missed and the config is all correct.
You can also check the config lock to see what actual config
values changed.
Change-Id: I6715d0794887f577e21742afcf56fd2b9d12170e
2021-05-31 22:15:00 +01:00
MinBytesPerSecond memory . Size ` help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B" testDefault:"1.00 KB" `
MinDownloadTimeout time . Duration ` help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s" `
2019-09-11 23:37:01 +01:00
MaxReverifyCount int ` help:"limit above which we consider an audit is failed" default:"3" `
testplanet/satellite: reduce the number of places default values need to be configured
Satellites set their configuration values to default values using
cfgstruct, however, it turns out our tests don't test these values
at all! Instead, they have a completely separate definition system
that is easy to forget about.
As is to be expected, these values have drifted, and it appears
in a few cases test planet is testing unreasonable values that we
won't see in production, or perhaps worse, features enabled in
production were missed and weren't enabled in testplanet.
This change makes it so all values are configured the same,
systematic way, so it's easy to see when test values are different
than dev values or release values, and it's less hard to forget
to enable features in testplanet.
In terms of reviewing, this change should be actually fairly
easy to review, considering private/testplanet/satellite.go keeps
the current config system and the new one and confirms that they
result in identical configurations, so you can be certain that
nothing was missed and the config is all correct.
You can also check the config lock to see what actual config
values changed.
Change-Id: I6715d0794887f577e21742afcf56fd2b9d12170e
2021-05-31 22:15:00 +01:00
ChoreInterval time . Duration ` help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL" `
QueueInterval time . Duration ` help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL" `
2019-09-11 23:37:01 +01:00
Slots int ` help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3" `
2020-12-14 12:54:22 +00:00
WorkerConcurrency int ` help:"number of workers to run audits on segments" default:"2" `
2022-11-11 23:11:40 +00:00
ReverifyWorkerConcurrency int ` help:"number of workers to run reverify audits on pieces" default:"2" `
ReverificationRetryInterval time . Duration ` help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m" `
2019-09-11 23:37:01 +01:00
}
2019-09-05 16:40:52 +01:00
// Worker contains information for populating audit queue and processing audits.
type Worker struct {
2022-08-01 13:00:23 +01:00
log * zap . Logger
2022-11-11 23:11:40 +00:00
queue VerifyQueue
2022-08-01 13:00:23 +01:00
verifier * Verifier
reporter Reporter
Loop * sync2 . Cycle
concurrency int
2019-09-05 16:40:52 +01:00
}
// NewWorker instantiates Worker.
2022-11-11 23:11:40 +00:00
func NewWorker ( log * zap . Logger , queue VerifyQueue , verifier * Verifier , reporter Reporter , config Config ) * Worker {
2019-09-05 16:40:52 +01:00
return & Worker {
log : log ,
2022-11-11 23:11:40 +00:00
queue : queue ,
2022-08-01 13:00:23 +01:00
verifier : verifier ,
reporter : reporter ,
Loop : sync2 . NewCycle ( config . QueueInterval ) ,
concurrency : config . WorkerConcurrency ,
2022-11-11 23:11:40 +00:00
}
2019-09-05 16:40:52 +01:00
}
// Run runs audit service 2.0.
func ( worker * Worker ) Run ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return worker . Loop . Run ( ctx , func ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
err = worker . process ( ctx )
if err != nil {
worker . log . Error ( "process" , zap . Error ( Error . Wrap ( err ) ) )
}
return nil
} )
}
// Close halts the worker.
func ( worker * Worker ) Close ( ) error {
2019-09-11 23:37:01 +01:00
worker . Loop . Close ( )
2019-09-05 16:40:52 +01:00
return nil
}
// process repeatedly removes an item from the queue and runs an audit.
func ( worker * Worker ) process ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2022-08-01 13:00:23 +01:00
limiter := sync2 . NewLimiter ( worker . concurrency )
defer limiter . Wait ( )
2019-09-05 16:40:52 +01:00
for {
2022-11-11 23:11:40 +00:00
segment , err := worker . queue . Next ( ctx )
2019-09-05 16:40:52 +01:00
if err != nil {
if ErrEmptyQueue . Has ( err ) {
2022-11-11 23:11:40 +00:00
return nil
2019-09-05 16:40:52 +01:00
}
return err
}
2022-08-01 13:00:23 +01:00
started := limiter . Go ( ctx , func ( ) {
2020-12-14 12:54:22 +00:00
err := worker . work ( ctx , segment )
2019-09-05 16:40:52 +01:00
if err != nil {
2021-08-16 20:42:56 +01:00
worker . log . Error ( "error(s) during audit" ,
2021-06-14 16:40:46 +01:00
zap . String ( "Segment StreamID" , segment . StreamID . String ( ) ) ,
zap . Uint64 ( "Segment Position" , segment . Position . Encode ( ) ) ,
zap . Error ( err ) )
2019-09-05 16:40:52 +01:00
}
} )
2022-08-01 13:00:23 +01:00
if ! started {
return ctx . Err ( )
}
2019-09-05 16:40:52 +01:00
}
}
2020-12-14 12:54:22 +00:00
func ( worker * Worker ) work ( ctx context . Context , segment Segment ) ( err error ) {
2019-11-22 23:40:00 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-11 23:37:01 +01:00
var errlist errs . Group
// First, attempt to reverify nodes for this segment that are in containment mode.
2020-12-14 12:54:22 +00:00
report , err := worker . verifier . Reverify ( ctx , segment )
2019-09-11 23:37:01 +01:00
if err != nil {
errlist . Add ( err )
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
2020-12-14 12:54:22 +00:00
_ , err = worker . reporter . RecordAudits ( ctx , report )
2019-09-11 23:37:01 +01:00
if err != nil {
errlist . Add ( err )
}
2022-11-11 23:11:40 +00:00
if err != nil {
if metabase . ErrSegmentNotFound . Has ( err ) {
// no need to add this error; Verify() will encounter it again
// and will handle the verification job as appropriate.
err = nil
} else {
errlist . Add ( err )
}
}
2019-09-11 23:37:01 +01:00
// Skip all reverified nodes in the next Verify step.
skip := make ( map [ storj . NodeID ] bool )
2019-10-09 15:06:58 +01:00
for _ , nodeID := range report . Successes {
skip [ nodeID ] = true
}
for _ , nodeID := range report . Offlines {
skip [ nodeID ] = true
}
for _ , nodeID := range report . Fails {
skip [ nodeID ] = true
}
for _ , pending := range report . PendingAudits {
skip [ pending . NodeID ] = true
2019-09-11 23:37:01 +01:00
}
2019-11-19 16:30:28 +00:00
for _ , nodeID := range report . Unknown {
skip [ nodeID ] = true
}
2019-09-11 23:37:01 +01:00
2022-11-11 23:11:40 +00:00
// Next, audit the remaining nodes that are not in containment mode.
2020-12-14 12:54:22 +00:00
report , err = worker . verifier . Verify ( ctx , segment , skip )
2019-09-11 23:37:01 +01:00
if err != nil {
errlist . Add ( err )
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
2020-12-14 12:54:22 +00:00
_ , err = worker . reporter . RecordAudits ( ctx , report )
2019-09-11 23:37:01 +01:00
if err != nil {
errlist . Add ( err )
}
return errlist . Err ( )
2019-09-05 16:40:52 +01:00
}