From 7b851b42f70ae1f3f31998ec341663f94347b716 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Wed, 12 Oct 2022 15:33:31 -0500 Subject: [PATCH] satellite/audit: split out auditor process This change creates a new independent process, the 'auditor', comparable to the repairer, gc, and api processes. This will allow auditors to be scaled independently of the core. Refs: https://github.com/storj/storj/issues/5251 Change-Id: I8a29eeb0a6e35753dfa0eab5c1246048065d1e91 --- cmd/satellite/auditor.go | 112 +++++++++ cmd/satellite/main.go | 7 + private/testplanet/satellite.go | 47 +++- satellite/audit/disqualification_test.go | 4 +- satellite/audit/integration_test.go | 8 + satellite/audit/reporter_test.go | 12 + satellite/audit/reverify_test.go | 4 + satellite/audit/worker.go | 8 +- satellite/auditor.go | 306 +++++++++++++++++++++++ satellite/core.go | 69 +---- satellite/reputation/suspension_test.go | 2 + 11 files changed, 501 insertions(+), 78 deletions(-) create mode 100644 cmd/satellite/auditor.go create mode 100644 satellite/auditor.go diff --git a/cmd/satellite/auditor.go b/cmd/satellite/auditor.go new file mode 100644 index 000000000..1c0098a44 --- /dev/null +++ b/cmd/satellite/auditor.go @@ -0,0 +1,112 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "github.com/spf13/cobra" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/context2" + "storj.io/common/errs2" + "storj.io/private/process" + "storj.io/private/version" + "storj.io/storj/private/revocation" + "storj.io/storj/satellite" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/orders" + "storj.io/storj/satellite/satellitedb" +) + +func cmdAuditorRun(cmd *cobra.Command, args []string) (err error) { + ctx, _ := process.Ctx(cmd) + log := zap.L() + + runCfg.Debug.Address = *process.DebugAddrFlag + + identity, err := runCfg.Identity.Load() + if err != nil { + log.Error("Failed to load identity.", zap.Error(err)) + return errs.New("Failed to load identity: %+v", err) + } + + db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-auditor"}) + if err != nil { + return errs.New("Error starting master database: %+v", err) + } + defer func() { + err = errs.Combine(err, db.Close()) + }() + + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{ + ApplicationName: "satellite-auditor", + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + ServerSideCopy: runCfg.Config.Metainfo.ServerSideCopy, + }) + if err != nil { + return errs.New("Error creating metabase connection: %+v", err) + } + defer func() { + err = errs.Combine(err, metabaseDB.Close()) + }() + + revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config) + if err != nil { + return errs.New("Error creating revocation database: %+v", err) + } + defer func() { + err = errs.Combine(err, revocationDB.Close()) + }() + + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize) + defer func() { + err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx))) + }() + + peer, err := satellite.NewAuditor( + log, + identity, + metabaseDB, + revocationDB, + db.VerifyQueue(), + db.ReverifyQueue(), + db.OverlayCache(), + db.NodeEvents(), + db.Reputation(), + db.Containment(), + rollupsWriteCache, + version.Build, + &runCfg.Config, + process.AtomicLevel(cmd), + ) + if err != nil { + return err + } + + _, err = peer.Version.Service.CheckVersion(ctx) + if err != nil { + return err + } + + if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil { + log.Warn("Failed to initialize telemetry batcher on auditor", zap.Error(err)) + } + + err = metabaseDB.CheckVersion(ctx) + if err != nil { + log.Error("Failed metabase database version check.", zap.Error(err)) + return errs.New("failed metabase version check: %+v", err) + } + + err = db.CheckVersion(ctx) + if err != nil { + log.Error("Failed satellite database version check.", zap.Error(err)) + return errs.New("Error checking version for satellitedb: %+v", err) + } + + runError := peer.Run(ctx) + closeError := peer.Close() + return errs2.IgnoreCanceled(errs.Combine(runError, closeError)) +} diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 9fd41bd10..a0bbeabdf 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -107,6 +107,11 @@ var ( Short: "Run the repair service", RunE: cmdRepairerRun, } + runAuditorCmd = &cobra.Command{ + Use: "auditor", + Short: "Run the auditor service", + RunE: cmdAuditorRun, + } runAdminCmd = &cobra.Command{ Use: "admin", Short: "Run the satellite Admin", @@ -344,6 +349,7 @@ func init() { runCmd.AddCommand(runAPICmd) runCmd.AddCommand(runAdminCmd) runCmd.AddCommand(runRepairerCmd) + runCmd.AddCommand(runAuditorCmd) runCmd.AddCommand(runGCCmd) runCmd.AddCommand(runGCBloomFilterCmd) runCmd.AddCommand(runRangedLoopCmd) @@ -378,6 +384,7 @@ func init() { process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runAdminCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) + process.Bind(runAuditorCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runGCCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runGCBloomFilterCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runRangedLoopCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 4cba13f06..7852afb7d 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -71,6 +71,7 @@ type Satellite struct { Core *satellite.Core API *satellite.API Repairer *satellite.Repairer + Auditor *satellite.Auditor Admin *satellite.Admin GC *satellite.GarbageCollection GCBF *satellite.GarbageCollectionBF @@ -297,6 +298,7 @@ func (system *Satellite) Close() error { system.API.Close(), system.Core.Close(), system.Repairer.Close(), + system.Auditor.Close(), system.Admin.Close(), system.GC.Close(), system.GCBF.Close(), @@ -316,6 +318,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(system.Repairer.Run(ctx)) }) + group.Go(func() error { + return errs2.IgnoreCanceled(system.Auditor.Run(ctx)) + }) group.Go(func() error { return errs2.IgnoreCanceled(system.Admin.Run(ctx)) }) @@ -543,6 +548,11 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int return nil, err } + auditorPeer, err := planet.newAuditor(ctx, index, identity, db, metabaseDB, config, versionInfo) + if err != nil { + return nil, err + } + gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, metabaseDB, config, versionInfo) if err != nil { return nil, err @@ -562,20 +572,21 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/") } - return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil + return createNewSystem(prefix, log, config, peer, api, repairerPeer, auditorPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil } // createNewSystem makes a new Satellite System and exposes the same interface from // before we split out the API. In the short term this will help keep all the tests passing // without much modification needed. However long term, we probably want to rework this // so it represents how the satellite will run when it is made up of many processes. -func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite { +func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, auditorPeer *satellite.Auditor, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite { system := &Satellite{ Name: name, Config: config, Core: peer, API: api, Repairer: repairerPeer, + Auditor: auditorPeer, Admin: adminPeer, GC: gcPeer, GCBF: gcBFPeer, @@ -618,14 +629,14 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Repair.Checker = peer.Repair.Checker system.Repair.Repairer = repairerPeer.Repairer - system.Audit.VerifyQueue = peer.Audit.VerifyQueue - system.Audit.ReverifyQueue = peer.Audit.ReverifyQueue - system.Audit.Worker = peer.Audit.Worker - system.Audit.ReverifyWorker = peer.Audit.ReverifyWorker + system.Audit.VerifyQueue = auditorPeer.Audit.VerifyQueue + system.Audit.ReverifyQueue = auditorPeer.Audit.ReverifyQueue + system.Audit.Worker = auditorPeer.Audit.Worker + system.Audit.ReverifyWorker = auditorPeer.Audit.ReverifyWorker system.Audit.Chore = peer.Audit.Chore - system.Audit.Verifier = peer.Audit.Verifier - system.Audit.Reverifier = peer.Audit.Reverifier - system.Audit.Reporter = peer.Audit.Reporter + system.Audit.Verifier = auditorPeer.Audit.Verifier + system.Audit.Reverifier = auditorPeer.Audit.Reverifier + system.Audit.Reporter = auditorPeer.Audit.Reporter system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service @@ -703,6 +714,24 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil) } +func (planet *Planet) newAuditor(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Auditor, err error) { + defer mon.Task()(&ctx)(&err) + + prefix := "satellite-auditor" + strconv.Itoa(index) + log := planet.log.Named(prefix) + + revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config) + if err != nil { + return nil, errs.Wrap(err) + } + planet.databases = append(planet.databases, revocationDB) + + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) + planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) + + return satellite.NewAuditor(log, identity, metabaseDB, revocationDB, db.VerifyQueue(), db.ReverifyQueue(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil) +} + type rollupsWriteCacheCloser struct { *orders.RollupsWriteCache } diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index 65418ea27..9ab677d2b 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -39,6 +39,8 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { config.Reputation.AuditLambda = 1 config.Reputation.AuditWeight = 1 config.Reputation.AuditDQ = auditDQCutOff + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -82,7 +84,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { if reputation <= auditDQCutOff || reputation == prevReputation { require.NotNilf(t, reputationInfo.Disqualified, - "Disqualified (%d) - cut-off: %f, prev. reputation: %f, current reputation: %f", + "Not disqualified, but should have been (iteration %d) - cut-off: %f, prev. reputation: %f, current reputation: %f", iterations, auditDQCutOff, prevReputation, reputation, ) diff --git a/satellite/audit/integration_test.go b/satellite/audit/integration_test.go index 3160161b1..09a1f936b 100644 --- a/satellite/audit/integration_test.go +++ b/satellite/audit/integration_test.go @@ -9,17 +9,25 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap" "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/audit" ) func TestChoreAndWorkerIntegration(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] audits := satellite.Audit diff --git a/satellite/audit/reporter_test.go b/satellite/audit/reporter_test.go index 78ccf4c34..e0a60a026 100644 --- a/satellite/audit/reporter_test.go +++ b/satellite/audit/reporter_test.go @@ -49,6 +49,12 @@ func TestReportPendingAudits(t *testing.T) { func TestRecordAuditsAtLeastOnce(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] audits := satellite.Audit @@ -228,6 +234,12 @@ func TestGracefullyExitedNotUpdated(t *testing.T) { func TestReportOfflineAudits(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] node := planet.StorageNodes[0] diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index ce3953443..b4b6039c4 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -457,6 +457,7 @@ func TestReverifyModifiedSegment(t *testing.T) { err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation) require.NoError(t, err) + require.Equal(t, audit.OutcomeNotNecessary, outcome) // expect that the node was removed from containment since the piece it was contained for is no longer part of the segment _, err = containment.Get(ctx, segment.Pieces[otherPiece].StorageNode) @@ -654,6 +655,7 @@ func TestReverifySlowDownload(t *testing.T) { err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation) require.NoError(t, err) + require.Equal(t, audit.OutcomeTimedOut, outcome) // expect that the node is still in containment _, err = containment.Get(ctx, slowNode) @@ -738,6 +740,8 @@ func TestMaxReverifyCount(t *testing.T) { // These config values are chosen to force the slow node to time out without timing out on the three normal nodes config.Audit.MinBytesPerSecond = 100 * memory.KiB config.Audit.MinDownloadTimeout = auditTimeout + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 }, testplanet.ReconfigureRS(2, 2, 4, 4), ), diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index 851bdbd45..c58abe253 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -135,7 +135,13 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) { // Next, audit the remaining nodes that are not in containment mode. report, err := worker.verifier.Verify(ctx, segment, skip) if err != nil { - errlist.Add(err) + if metabase.ErrSegmentNotFound.Has(err) { + // no need to add this error; Verify() will encounter it again + // and will handle the verification job as appropriate. + err = nil + } else { + errlist.Add(err) + } } worker.reporter.RecordAudits(ctx, report) diff --git a/satellite/auditor.go b/satellite/auditor.go new file mode 100644 index 000000000..69ecbe551 --- /dev/null +++ b/satellite/auditor.go @@ -0,0 +1,306 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellite + +import ( + "context" + "errors" + "net" + "runtime/pprof" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "storj.io/common/identity" + "storj.io/common/peertls/extensions" + "storj.io/common/peertls/tlsopts" + "storj.io/common/rpc" + "storj.io/common/signing" + "storj.io/common/storj" + "storj.io/private/debug" + "storj.io/private/version" + "storj.io/storj/private/lifecycle" + version_checker "storj.io/storj/private/version/checker" + "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/mailservice" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/nodeevents" + "storj.io/storj/satellite/orders" + "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/reputation" +) + +// Auditor is the auditor process. +// +// architecture: Peer +type Auditor struct { + Log *zap.Logger + Identity *identity.FullIdentity + + Servers *lifecycle.Group + Services *lifecycle.Group + + Dialer rpc.Dialer + + Version struct { + Chore *version_checker.Chore + Service *version_checker.Service + } + + Debug struct { + Listener net.Listener + Server *debug.Server + } + + Mail *mailservice.Service + Overlay *overlay.Service + Reputation *reputation.Service + Orders struct { + DB orders.DB + Service *orders.Service + Chore *orders.Chore + } + + Audit struct { + Verifier *audit.Verifier + Reverifier *audit.Reverifier + VerifyQueue audit.VerifyQueue + ReverifyQueue audit.ReverifyQueue + Reporter audit.Reporter + Worker *audit.Worker + ReverifyWorker *audit.ReverifyWorker + } +} + +// NewAuditor creates a new auditor peer. +func NewAuditor(log *zap.Logger, full *identity.FullIdentity, + metabaseDB *metabase.DB, + revocationDB extensions.RevocationDB, + verifyQueue audit.VerifyQueue, + reverifyQueue audit.ReverifyQueue, + overlayCache overlay.DB, + nodeEvents nodeevents.DB, + reputationdb reputation.DB, + containmentDB audit.Containment, + rollupsWriteCache *orders.RollupsWriteCache, + versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel, +) (*Auditor, error) { + peer := &Auditor{ + Log: log, + Identity: full, + + Servers: lifecycle.NewGroup(log.Named("servers")), + Services: lifecycle.NewGroup(log.Named("services")), + } + + { // setup debug + var err error + if config.Debug.Address != "" { + peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address) + if err != nil { + withoutStack := errors.New(err.Error()) + peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack)) + } + } + debugConfig := config.Debug + debugConfig.ControlTitle = "Audit" + peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel) + peer.Servers.Add(lifecycle.Item{ + Name: "debug", + Run: peer.Debug.Server.Run, + Close: peer.Debug.Server.Close, + }) + } + + { // setup version control + peer.Log.Info("Version info", + zap.Stringer("Version", versionInfo.Version.Version), + zap.String("Commit Hash", versionInfo.CommitHash), + zap.Stringer("Build Timestamp", versionInfo.Timestamp), + zap.Bool("Release Build", versionInfo.Release), + ) + peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") + peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval) + + peer.Services.Add(lifecycle.Item{ + Name: "version", + Run: peer.Version.Chore.Run, + }) + } + + { // setup dialer + sc := config.Server + + tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + peer.Dialer = rpc.NewDefaultDialer(tlsOptions) + } + + { // setup mail + var err error + peer.Mail, err = setupMailService(peer.Log, *config) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + peer.Services.Add(lifecycle.Item{ + Name: "mail:service", + Close: peer.Mail.Close, + }) + } + + { // setup overlay + var err error + peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, nodeEvents, peer.Mail, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + peer.Services.Add(lifecycle.Item{ + Name: "overlay", + Run: peer.Overlay.Run, + Close: peer.Overlay.Close, + }) + } + + { // setup reputation + if config.Reputation.FlushInterval > 0 { + cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationdb, config.Reputation) + peer.Services.Add(lifecycle.Item{ + Name: "reputation:writecache", + Run: cachingDB.Manage, + }) + reputationdb = cachingDB + } + peer.Reputation = reputation.NewService(log.Named("reputation:service"), + peer.Overlay, + reputationdb, + config.Reputation, + ) + + peer.Services.Add(lifecycle.Item{ + Name: "reputation", + Close: peer.Reputation.Close, + }) + } + + { // setup orders + peer.Orders.DB = rollupsWriteCache + peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) + peer.Services.Add(lifecycle.Item{ + Name: "orders:chore", + Run: peer.Orders.Chore.Run, + Close: peer.Orders.Chore.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Orders Chore", peer.Orders.Chore.Loop)) + + var err error + peer.Orders.Service, err = orders.NewService( + log.Named("orders"), + signing.SignerFromFullIdentity(peer.Identity), + peer.Overlay, + peer.Orders.DB, + config.Orders, + ) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + } + + { // setup audit + // force tcp for now because audit is very sensitive to how errors + // are returned, and adding quic can cause problems + dialer := peer.Dialer + //lint:ignore SA1019 deprecated is fine here. + //nolint:staticcheck // deprecated is fine here. + dialer.Connector = rpc.NewDefaultTCPConnector(nil) + + peer.Audit.VerifyQueue = verifyQueue + peer.Audit.ReverifyQueue = reverifyQueue + + peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"), + metabaseDB, + dialer, + peer.Overlay, + containmentDB, + peer.Orders.Service, + peer.Identity, + config.Audit.MinBytesPerSecond, + config.Audit.MinDownloadTimeout) + peer.Audit.Reverifier = audit.NewReverifier(log.Named("audit:reverifier"), + peer.Audit.Verifier, + reverifyQueue, + config.Audit) + + peer.Audit.Reporter = audit.NewReporter( + log.Named("reporter"), + peer.Reputation, + peer.Overlay, + containmentDB, + config.Audit.MaxRetriesStatDB, + int32(config.Audit.MaxReverifyCount)) + + peer.Audit.Worker = audit.NewWorker(log.Named("audit:verify-worker"), + verifyQueue, + peer.Audit.Verifier, + reverifyQueue, + peer.Audit.Reporter, + config.Audit) + peer.Services.Add(lifecycle.Item{ + Name: "audit:verify-worker", + Run: peer.Audit.Worker.Run, + Close: peer.Audit.Worker.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Audit Verify Worker", peer.Audit.Worker.Loop)) + + peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"), + reverifyQueue, + peer.Audit.Reverifier, + peer.Audit.Reporter, + config.Audit) + peer.Services.Add(lifecycle.Item{ + Name: "audit:reverify-worker", + Run: peer.Audit.ReverifyWorker.Run, + Close: peer.Audit.ReverifyWorker.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop)) + } + + return peer, nil +} + +// Run runs the auditor process until it's either closed or it errors. +func (peer *Auditor) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + group, ctx := errgroup.WithContext(ctx) + + pprof.Do(ctx, pprof.Labels("subsystem", "auditor"), func(ctx context.Context) { + peer.Servers.Run(ctx, group) + peer.Services.Run(ctx, group) + + pprof.Do(ctx, pprof.Labels("name", "subsystem-wait"), func(ctx context.Context) { + err = group.Wait() + }) + }) + return err +} + +// Close closes all the resources. +func (peer *Auditor) Close() error { + return errs.Combine( + peer.Servers.Close(), + peer.Services.Close(), + ) +} + +// ID returns the peer ID. +func (peer *Auditor) ID() storj.NodeID { return peer.Identity.ID } diff --git a/satellite/core.go b/satellite/core.go index edecc193e..db2bcdb39 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -116,14 +116,8 @@ type Core struct { } Audit struct { - VerifyQueue audit.VerifyQueue - ReverifyQueue audit.ReverifyQueue - Worker *audit.Worker - ReverifyWorker *audit.ReverifyWorker - Chore *audit.Chore - Verifier *audit.Verifier - Reverifier *audit.Reverifier - Reporter audit.Reporter + VerifyQueue audit.VerifyQueue + Chore *audit.Chore } ExpiredDeletion struct { @@ -399,68 +393,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } { // setup audit - // force tcp for now because audit is very sensitive to how errors - // are returned, and adding quic can cause problems - dialer := peer.Dialer - //lint:ignore SA1019 deprecated is fine here. - //nolint:staticcheck // deprecated is fine here. - dialer.Connector = rpc.NewDefaultTCPConnector(nil) - config := config.Audit peer.Audit.VerifyQueue = db.VerifyQueue() - peer.Audit.ReverifyQueue = db.ReverifyQueue() - - peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"), - peer.Metainfo.Metabase, - dialer, - peer.Overlay.Service, - peer.DB.Containment(), - peer.Orders.Service, - peer.Identity, - config.MinBytesPerSecond, - config.MinDownloadTimeout, - ) - peer.Audit.Reverifier = audit.NewReverifier(log.Named("audit:reverifier"), - peer.Audit.Verifier, - peer.Audit.ReverifyQueue, - config) - - peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"), - peer.Reputation.Service, - peer.Overlay.Service, - peer.DB.Containment(), - config.MaxRetriesStatDB, - int32(config.MaxReverifyCount), - ) - - peer.Audit.Worker = audit.NewWorker(peer.Log.Named("audit:worker"), - peer.Audit.VerifyQueue, - peer.Audit.Verifier, - peer.Audit.ReverifyQueue, - peer.Audit.Reporter, - config, - ) - peer.Services.Add(lifecycle.Item{ - Name: "audit:worker", - Run: peer.Audit.Worker.Run, - Close: peer.Audit.Worker.Close, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Audit Worker", peer.Audit.Worker.Loop)) - - peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"), - peer.Audit.ReverifyQueue, - peer.Audit.Reverifier, - peer.Audit.Reporter, - config) - peer.Services.Add(lifecycle.Item{ - Name: "audit:reverify-worker", - Run: peer.Audit.ReverifyWorker.Run, - Close: peer.Audit.ReverifyWorker.Close, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop)) peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"), peer.Audit.VerifyQueue, diff --git a/satellite/reputation/suspension_test.go b/satellite/reputation/suspension_test.go index 86904cd23..b6d392bc6 100644 --- a/satellite/reputation/suspension_test.go +++ b/satellite/reputation/suspension_test.go @@ -148,6 +148,8 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) { config.Reputation.InitialAlpha = 1 config.Reputation.AuditLambda = 0.95 config.Reputation.AuditDQ = 0.6 + // disable write cache so changes are immediate + config.Reputation.FlushInterval = 0 }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {