satellite/audit: split out auditor process

This change creates a new independent process, the 'auditor', comparable
to the repairer, gc, and api processes. This will allow auditors to be
scaled independently of the core.

Refs: https://github.com/storj/storj/issues/5251
Change-Id: I8a29eeb0a6e35753dfa0eab5c1246048065d1e91
This commit is contained in:
paul cannon 2022-10-12 15:33:31 -05:00
parent fc905a15f7
commit 7b851b42f7
11 changed files with 501 additions and 78 deletions

112
cmd/satellite/auditor.go Normal file
View File

@ -0,0 +1,112 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/errs2"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
func cmdAuditorRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
runCfg.Debug.Address = *process.DebugAddrFlag
identity, err := runCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-auditor"})
if err != nil {
return errs.New("Error starting master database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
ApplicationName: "satellite-auditor",
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
ServerSideCopy: runCfg.Config.Metainfo.ServerSideCopy,
})
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewAuditor(
log,
identity,
metabaseDB,
revocationDB,
db.VerifyQueue(),
db.ReverifyQueue(),
db.OverlayCache(),
db.NodeEvents(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
process.AtomicLevel(cmd),
)
if err != nil {
return err
}
_, err = peer.Version.Service.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil {
log.Warn("Failed to initialize telemetry batcher on auditor", zap.Error(err))
}
err = metabaseDB.CheckVersion(ctx)
if err != nil {
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
log.Error("Failed satellite database version check.", zap.Error(err))
return errs.New("Error checking version for satellitedb: %+v", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs2.IgnoreCanceled(errs.Combine(runError, closeError))
}

View File

@ -107,6 +107,11 @@ var (
Short: "Run the repair service",
RunE: cmdRepairerRun,
}
runAuditorCmd = &cobra.Command{
Use: "auditor",
Short: "Run the auditor service",
RunE: cmdAuditorRun,
}
runAdminCmd = &cobra.Command{
Use: "admin",
Short: "Run the satellite Admin",
@ -344,6 +349,7 @@ func init() {
runCmd.AddCommand(runAPICmd)
runCmd.AddCommand(runAdminCmd)
runCmd.AddCommand(runRepairerCmd)
runCmd.AddCommand(runAuditorCmd)
runCmd.AddCommand(runGCCmd)
runCmd.AddCommand(runGCBloomFilterCmd)
runCmd.AddCommand(runRangedLoopCmd)
@ -378,6 +384,7 @@ func init() {
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAdminCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAuditorCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCBloomFilterCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRangedLoopCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))

View File

@ -71,6 +71,7 @@ type Satellite struct {
Core *satellite.Core
API *satellite.API
Repairer *satellite.Repairer
Auditor *satellite.Auditor
Admin *satellite.Admin
GC *satellite.GarbageCollection
GCBF *satellite.GarbageCollectionBF
@ -297,6 +298,7 @@ func (system *Satellite) Close() error {
system.API.Close(),
system.Core.Close(),
system.Repairer.Close(),
system.Auditor.Close(),
system.Admin.Close(),
system.GC.Close(),
system.GCBF.Close(),
@ -316,6 +318,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Auditor.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Admin.Run(ctx))
})
@ -543,6 +548,11 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}
auditorPeer, err := planet.newAuditor(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
@ -562,20 +572,21 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/")
}
return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil
return createNewSystem(prefix, log, config, peer, api, repairerPeer, auditorPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many processes.
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite {
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, auditorPeer *satellite.Auditor, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite {
system := &Satellite{
Name: name,
Config: config,
Core: peer,
API: api,
Repairer: repairerPeer,
Auditor: auditorPeer,
Admin: adminPeer,
GC: gcPeer,
GCBF: gcBFPeer,
@ -618,14 +629,14 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = repairerPeer.Repairer
system.Audit.VerifyQueue = peer.Audit.VerifyQueue
system.Audit.ReverifyQueue = peer.Audit.ReverifyQueue
system.Audit.Worker = peer.Audit.Worker
system.Audit.ReverifyWorker = peer.Audit.ReverifyWorker
system.Audit.VerifyQueue = auditorPeer.Audit.VerifyQueue
system.Audit.ReverifyQueue = auditorPeer.Audit.ReverifyQueue
system.Audit.Worker = auditorPeer.Audit.Worker
system.Audit.ReverifyWorker = auditorPeer.Audit.ReverifyWorker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reverifier = peer.Audit.Reverifier
system.Audit.Reporter = peer.Audit.Reporter
system.Audit.Verifier = auditorPeer.Audit.Verifier
system.Audit.Reverifier = auditorPeer.Audit.Reverifier
system.Audit.Reporter = auditorPeer.Audit.Reporter
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
@ -703,6 +714,24 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
}
func (planet *Planet) newAuditor(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Auditor, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-auditor" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewAuditor(log, identity, metabaseDB, revocationDB, db.VerifyQueue(), db.ReverifyQueue(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
}
type rollupsWriteCacheCloser struct {
*orders.RollupsWriteCache
}

View File

@ -39,6 +39,8 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
config.Reputation.AuditLambda = 1
config.Reputation.AuditWeight = 1
config.Reputation.AuditDQ = auditDQCutOff
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -82,7 +84,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
if reputation <= auditDQCutOff || reputation == prevReputation {
require.NotNilf(t, reputationInfo.Disqualified,
"Disqualified (%d) - cut-off: %f, prev. reputation: %f, current reputation: %f",
"Not disqualified, but should have been (iteration %d) - cut-off: %f, prev. reputation: %f, current reputation: %f",
iterations, auditDQCutOff, prevReputation, reputation,
)

View File

@ -9,17 +9,25 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
)
func TestChoreAndWorkerIntegration(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit

View File

@ -49,6 +49,12 @@ func TestReportPendingAudits(t *testing.T) {
func TestRecordAuditsAtLeastOnce(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
@ -228,6 +234,12 @@ func TestGracefullyExitedNotUpdated(t *testing.T) {
func TestReportOfflineAudits(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
node := planet.StorageNodes[0]

View File

@ -457,6 +457,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
require.NoError(t, err)
require.Equal(t, audit.OutcomeNotNecessary, outcome)
// expect that the node was removed from containment since the piece it was contained for is no longer part of the segment
_, err = containment.Get(ctx, segment.Pieces[otherPiece].StorageNode)
@ -654,6 +655,7 @@ func TestReverifySlowDownload(t *testing.T) {
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
require.NoError(t, err)
require.Equal(t, audit.OutcomeTimedOut, outcome)
// expect that the node is still in containment
_, err = containment.Get(ctx, slowNode)
@ -738,6 +740,8 @@ func TestMaxReverifyCount(t *testing.T) {
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
config.Audit.MinBytesPerSecond = 100 * memory.KiB
config.Audit.MinDownloadTimeout = auditTimeout
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
testplanet.ReconfigureRS(2, 2, 4, 4),
),

View File

@ -135,7 +135,13 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
// Next, audit the remaining nodes that are not in containment mode.
report, err := worker.verifier.Verify(ctx, segment, skip)
if err != nil {
errlist.Add(err)
if metabase.ErrSegmentNotFound.Has(err) {
// no need to add this error; Verify() will encounter it again
// and will handle the verification job as appropriate.
err = nil
} else {
errlist.Add(err)
}
}
worker.reporter.RecordAudits(ctx, report)

306
satellite/auditor.go Normal file
View File

@ -0,0 +1,306 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"errors"
"net"
"runtime/pprof"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/reputation"
)
// Auditor is the auditor process.
//
// architecture: Peer
type Auditor struct {
Log *zap.Logger
Identity *identity.FullIdentity
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version struct {
Chore *version_checker.Chore
Service *version_checker.Service
}
Debug struct {
Listener net.Listener
Server *debug.Server
}
Mail *mailservice.Service
Overlay *overlay.Service
Reputation *reputation.Service
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Audit struct {
Verifier *audit.Verifier
Reverifier *audit.Reverifier
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Reporter audit.Reporter
Worker *audit.Worker
ReverifyWorker *audit.ReverifyWorker
}
}
// NewAuditor creates a new auditor peer.
func NewAuditor(log *zap.Logger, full *identity.FullIdentity,
metabaseDB *metabase.DB,
revocationDB extensions.RevocationDB,
verifyQueue audit.VerifyQueue,
reverifyQueue audit.ReverifyQueue,
overlayCache overlay.DB,
nodeEvents nodeevents.DB,
reputationdb reputation.DB,
containmentDB audit.Containment,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel,
) (*Auditor, error) {
peer := &Auditor{
Log: log,
Identity: full,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
{ // setup debug
var err error
if config.Debug.Address != "" {
peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
if err != nil {
withoutStack := errors.New(err.Error())
peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
}
}
debugConfig := config.Debug
debugConfig.ControlTitle = "Audit"
peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
peer.Servers.Add(lifecycle.Item{
Name: "debug",
Run: peer.Debug.Server.Run,
Close: peer.Debug.Server.Close,
})
}
{ // setup version control
peer.Log.Info("Version info",
zap.Stringer("Version", versionInfo.Version.Version),
zap.String("Commit Hash", versionInfo.CommitHash),
zap.Stringer("Build Timestamp", versionInfo.Timestamp),
zap.Bool("Release Build", versionInfo.Release),
)
peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Chore.Run,
})
}
{ // setup dialer
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup mail
var err error
peer.Mail, err = setupMailService(peer.Log, *config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "mail:service",
Close: peer.Mail.Close,
})
}
{ // setup overlay
var err error
peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, nodeEvents, peer.Mail, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Run: peer.Overlay.Run,
Close: peer.Overlay.Close,
})
}
{ // setup reputation
if config.Reputation.FlushInterval > 0 {
cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationdb, config.Reputation)
peer.Services.Add(lifecycle.Item{
Name: "reputation:writecache",
Run: cachingDB.Manage,
})
reputationdb = cachingDB
}
peer.Reputation = reputation.NewService(log.Named("reputation:service"),
peer.Overlay,
reputationdb,
config.Reputation,
)
peer.Services.Add(lifecycle.Item{
Name: "reputation",
Close: peer.Reputation.Close,
})
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
var err error
peer.Orders.Service, err = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
peer.Orders.DB,
config.Orders,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup audit
// force tcp for now because audit is very sensitive to how errors
// are returned, and adding quic can cause problems
dialer := peer.Dialer
//lint:ignore SA1019 deprecated is fine here.
//nolint:staticcheck // deprecated is fine here.
dialer.Connector = rpc.NewDefaultTCPConnector(nil)
peer.Audit.VerifyQueue = verifyQueue
peer.Audit.ReverifyQueue = reverifyQueue
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
metabaseDB,
dialer,
peer.Overlay,
containmentDB,
peer.Orders.Service,
peer.Identity,
config.Audit.MinBytesPerSecond,
config.Audit.MinDownloadTimeout)
peer.Audit.Reverifier = audit.NewReverifier(log.Named("audit:reverifier"),
peer.Audit.Verifier,
reverifyQueue,
config.Audit)
peer.Audit.Reporter = audit.NewReporter(
log.Named("reporter"),
peer.Reputation,
peer.Overlay,
containmentDB,
config.Audit.MaxRetriesStatDB,
int32(config.Audit.MaxReverifyCount))
peer.Audit.Worker = audit.NewWorker(log.Named("audit:verify-worker"),
verifyQueue,
peer.Audit.Verifier,
reverifyQueue,
peer.Audit.Reporter,
config.Audit)
peer.Services.Add(lifecycle.Item{
Name: "audit:verify-worker",
Run: peer.Audit.Worker.Run,
Close: peer.Audit.Worker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Verify Worker", peer.Audit.Worker.Loop))
peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"),
reverifyQueue,
peer.Audit.Reverifier,
peer.Audit.Reporter,
config.Audit)
peer.Services.Add(lifecycle.Item{
Name: "audit:reverify-worker",
Run: peer.Audit.ReverifyWorker.Run,
Close: peer.Audit.ReverifyWorker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop))
}
return peer, nil
}
// Run runs the auditor process until it's either closed or it errors.
func (peer *Auditor) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
pprof.Do(ctx, pprof.Labels("subsystem", "auditor"), func(ctx context.Context) {
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
pprof.Do(ctx, pprof.Labels("name", "subsystem-wait"), func(ctx context.Context) {
err = group.Wait()
})
})
return err
}
// Close closes all the resources.
func (peer *Auditor) Close() error {
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.
func (peer *Auditor) ID() storj.NodeID { return peer.Identity.ID }

View File

@ -116,14 +116,8 @@ type Core struct {
}
Audit struct {
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
ReverifyWorker *audit.ReverifyWorker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
Chore *audit.Chore
}
ExpiredDeletion struct {
@ -399,68 +393,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
{ // setup audit
// force tcp for now because audit is very sensitive to how errors
// are returned, and adding quic can cause problems
dialer := peer.Dialer
//lint:ignore SA1019 deprecated is fine here.
//nolint:staticcheck // deprecated is fine here.
dialer.Connector = rpc.NewDefaultTCPConnector(nil)
config := config.Audit
peer.Audit.VerifyQueue = db.VerifyQueue()
peer.Audit.ReverifyQueue = db.ReverifyQueue()
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Metabase,
dialer,
peer.Overlay.Service,
peer.DB.Containment(),
peer.Orders.Service,
peer.Identity,
config.MinBytesPerSecond,
config.MinDownloadTimeout,
)
peer.Audit.Reverifier = audit.NewReverifier(log.Named("audit:reverifier"),
peer.Audit.Verifier,
peer.Audit.ReverifyQueue,
config)
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
peer.Reputation.Service,
peer.Overlay.Service,
peer.DB.Containment(),
config.MaxRetriesStatDB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.VerifyQueue,
peer.Audit.Verifier,
peer.Audit.ReverifyQueue,
peer.Audit.Reporter,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:worker",
Run: peer.Audit.Worker.Run,
Close: peer.Audit.Worker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Worker", peer.Audit.Worker.Loop))
peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"),
peer.Audit.ReverifyQueue,
peer.Audit.Reverifier,
peer.Audit.Reporter,
config)
peer.Services.Add(lifecycle.Item{
Name: "audit:reverify-worker",
Run: peer.Audit.ReverifyWorker.Run,
Close: peer.Audit.ReverifyWorker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop))
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.VerifyQueue,

View File

@ -148,6 +148,8 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) {
config.Reputation.InitialAlpha = 1
config.Reputation.AuditLambda = 0.95
config.Reputation.AuditDQ = 0.6
// disable write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {