storj/satellite/repairer.go
paul cannon fc905a15f7 satellite/audit: newContainment->containment
Now that all the reverification changes have been made and the old code
is out of the way, this commit renames the new things back to the old
names. Mostly, this involves renaming "newContainment" to "containment"
or "NewContainment" to "Containment", but there are a few other renames
that have been promised and are carried out here.

Refs: https://github.com/storj/storj/issues/5230
Change-Id: I34e2b857ea338acbb8421cdac18b17f2974f233c
2022-12-16 17:59:52 +00:00

286 lines
7.3 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"errors"
"net"
"runtime/pprof"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/reputation"
)
// Repairer is the repairer process.
//
// architecture: Peer
type Repairer struct {
Log *zap.Logger
Identity *identity.FullIdentity
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version struct {
Chore *version_checker.Chore
Service *version_checker.Service
}
Debug struct {
Listener net.Listener
Server *debug.Server
}
Mail *mailservice.Service
Overlay *overlay.Service
Reputation *reputation.Service
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Audit struct {
Reporter audit.Reporter
}
EcRepairer *repairer.ECRepairer
SegmentRepairer *repairer.SegmentRepairer
Repairer *repairer.Service
}
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
metabaseDB *metabase.DB,
revocationDB extensions.RevocationDB,
repairQueue queue.RepairQueue,
bucketsDB buckets.DB,
overlayCache overlay.DB,
nodeEvents nodeevents.DB,
reputationdb reputation.DB,
containmentDB audit.Containment,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel,
) (*Repairer, error) {
peer := &Repairer{
Log: log,
Identity: full,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
{ // setup debug
var err error
if config.Debug.Address != "" {
peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
if err != nil {
withoutStack := errors.New(err.Error())
peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
}
}
debugConfig := config.Debug
debugConfig.ControlTitle = "Repair"
peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
peer.Servers.Add(lifecycle.Item{
Name: "debug",
Run: peer.Debug.Server.Run,
Close: peer.Debug.Server.Close,
})
}
{
peer.Log.Info("Version info",
zap.Stringer("Version", versionInfo.Version.Version),
zap.String("Commit Hash", versionInfo.CommitHash),
zap.Stringer("Build Timestamp", versionInfo.Timestamp),
zap.Bool("Release Build", versionInfo.Release),
)
peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Chore.Run,
})
}
{ // setup dialer
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup mail
var err error
peer.Mail, err = setupMailService(peer.Log, *config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "mail:service",
Close: peer.Mail.Close,
})
}
{ // setup overlay
var err error
peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, nodeEvents, peer.Mail, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Run: peer.Overlay.Run,
Close: peer.Overlay.Close,
})
}
{ // setup reputation
if config.Reputation.FlushInterval > 0 {
cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationdb, config.Reputation)
peer.Services.Add(lifecycle.Item{
Name: "reputation:writecache",
Run: cachingDB.Manage,
})
reputationdb = cachingDB
}
peer.Reputation = reputation.NewService(log.Named("reputation:service"),
peer.Overlay,
reputationdb,
config.Reputation,
)
peer.Services.Add(lifecycle.Item{
Name: "reputation",
Close: peer.Reputation.Close,
})
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
var err error
peer.Orders.Service, err = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
peer.Orders.DB,
config.Orders,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup audit
peer.Audit.Reporter = audit.NewReporter(
log.Named("reporter"),
peer.Reputation,
peer.Overlay,
containmentDB,
config.Audit.MaxRetriesStatDB,
int32(config.Audit.MaxReverifyCount))
}
{ // setup repairer
peer.EcRepairer = repairer.NewECRepairer(
log.Named("ec-repair"),
peer.Dialer,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
config.Repairer.DownloadTimeout,
config.Repairer.InMemoryRepair)
peer.SegmentRepairer = repairer.NewSegmentRepairer(
log.Named("segment-repair"),
metabaseDB,
peer.Orders.Service,
peer.Overlay,
peer.Audit.Reporter,
peer.EcRepairer,
config.Checker.RepairOverrides,
config.Repairer,
)
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer)
peer.Services.Add(lifecycle.Item{
Name: "repair",
Run: peer.Repairer.Run,
Close: peer.Repairer.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Repair Worker", peer.Repairer.Loop))
}
return peer, nil
}
// Run runs the repair process until it's either closed or it errors.
func (peer *Repairer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
pprof.Do(ctx, pprof.Labels("subsystem", "repairer"), func(ctx context.Context) {
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
pprof.Do(ctx, pprof.Labels("name", "subsystem-wait"), func(ctx context.Context) {
err = group.Wait()
})
})
return err
}
// Close closes all the resources.
func (peer *Repairer) Close() error {
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.
func (peer *Repairer) ID() storj.NodeID { return peer.Identity.ID }