storj/private/testplanet/satellite.go
JT Olio b502310fe5 cmd/inspector: remove
this is a very old tool built in the very early days
of v3, when we didn't know how the network would be
used. this tool anticipated being able to query remote
nodes for internal state. we don't do that. i don't
think anyone uses this.

Change-Id: Ie1ded3ecbedb09313f2d6fc721039e0f15e4ee85
2023-05-19 09:11:29 +00:00

764 lines
25 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"context"
"net"
"os"
"path/filepath"
"runtime/pprof"
"strconv"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/cfgstruct"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/private/server"
"storj.io/storj/private/testredis"
versionchecker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/console/userinfo"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/zombiedeletion"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/offlinenodes"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/reputation"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
// Satellite contains all the processes needed to run a full Satellite setup.
type Satellite struct {
Name string
Config satellite.Config
Core *satellite.Core
API *satellite.API
Repairer *satellite.Repairer
Auditor *satellite.Auditor
Admin *satellite.Admin
GC *satellite.GarbageCollection
GCBF *satellite.GarbageCollectionBF
RangedLoop *satellite.RangedLoop
Log *zap.Logger
Identity *identity.FullIdentity
DB satellite.DB
Dialer rpc.Dialer
Server *server.Server
Version *versionchecker.Service
Contact struct {
Service *contact.Service
Endpoint *contact.Endpoint
}
Overlay struct {
DB overlay.DB
Service *overlay.Service
OfflineNodeEmails *offlinenodes.Chore
DQStrayNodes *straynodes.Chore
}
NodeEvents struct {
DB nodeevents.DB
Notifier nodeevents.Notifier
Chore *nodeevents.Chore
}
Metainfo struct {
// TODO remove when uplink will be adjusted to use Metabase.DB
Metabase *metabase.DB
Endpoint *metainfo.Endpoint
}
Userinfo struct {
Endpoint *userinfo.Endpoint
}
Metabase struct {
DB *metabase.DB
}
Orders struct {
DB orders.DB
Endpoint *orders.Endpoint
Service *orders.Service
Chore *orders.Chore
}
Repair struct {
Repairer *repairer.Service
}
Audit struct {
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
ReverifyWorker *audit.ReverifyWorker
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
ContainmentSyncChore *audit.ContainmentSyncChore
}
Reputation struct {
Service *reputation.Service
}
GarbageCollection struct {
Sender *sender.Service
}
ExpiredDeletion struct {
Chore *expireddeletion.Chore
}
ZombieDeletion struct {
Chore *zombiedeletion.Chore
}
Accounting struct {
Tally *tally.Service
Rollup *rollup.Service
ProjectUsage *accounting.Service
ProjectBWCleanup *projectbwcleanup.Chore
RollupArchive *rolluparchive.Chore
}
LiveAccounting struct {
Cache accounting.Cache
}
ProjectLimits struct {
Cache *accounting.ProjectLimitCache
}
Mail struct {
Service *mailservice.Service
}
Console struct {
Listener net.Listener
Service *console.Service
Endpoint *consoleweb.Server
}
NodeStats struct {
Endpoint *nodestats.Endpoint
}
GracefulExit struct {
Endpoint *gracefulexit.Endpoint
}
}
// Label returns name for debugger.
func (system *Satellite) Label() string { return system.Name }
// ID returns the ID of the Satellite system.
func (system *Satellite) ID() storj.NodeID { return system.API.Identity.ID }
// Addr returns the public address from the Satellite system API.
func (system *Satellite) Addr() string { return system.API.Server.Addr().String() }
// URL returns the node url from the Satellite system API.
func (system *Satellite) URL() string { return system.NodeURL().String() }
// ConsoleURL returns the console URL.
func (system *Satellite) ConsoleURL() string {
return "http://" + system.API.Console.Listener.Addr().String()
}
// NodeURL returns the storj.NodeURL from the Satellite system API.
func (system *Satellite) NodeURL() storj.NodeURL {
return storj.NodeURL{ID: system.API.ID(), Address: system.API.Addr()}
}
// AddUser adds user to a satellite. Password from newUser will be always overridden by FullName to have
// known password which can be used automatically.
func (system *Satellite) AddUser(ctx context.Context, newUser console.CreateUser, maxNumberOfProjects int) (_ *console.User, err error) {
defer mon.Task()(&ctx)(&err)
regToken, err := system.API.Console.Service.CreateRegToken(ctx, maxNumberOfProjects)
if err != nil {
return nil, errs.Wrap(err)
}
newUser.Password = newUser.FullName
user, err := system.API.Console.Service.CreateUser(ctx, newUser, regToken.Secret)
if err != nil {
return nil, errs.Wrap(err)
}
activationToken, err := system.API.Console.Service.GenerateActivationToken(ctx, user.ID, user.Email)
if err != nil {
return nil, errs.Wrap(err)
}
_, err = system.API.Console.Service.ActivateAccount(ctx, activationToken)
if err != nil {
return nil, errs.Wrap(err)
}
userCtx, err := system.UserContext(ctx, user.ID)
if err != nil {
return nil, errs.Wrap(err)
}
_, err = system.API.Console.Service.Payments().SetupAccount(userCtx)
if err != nil {
return nil, errs.Wrap(err)
}
return user, nil
}
// AddProject adds project to a satellite and makes specified user an owner.
func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name string) (_ *console.Project, err error) {
defer mon.Task()(&ctx)(&err)
ctx, err = system.UserContext(ctx, ownerID)
if err != nil {
return nil, errs.Wrap(err)
}
project, err := system.API.Console.Service.CreateProject(ctx, console.ProjectInfo{
Name: name,
})
if err != nil {
return nil, errs.Wrap(err)
}
return project, nil
}
// UserContext creates context with user.
func (system *Satellite) UserContext(ctx context.Context, userID uuid.UUID) (_ context.Context, err error) {
defer mon.Task()(&ctx)(&err)
user, err := system.API.Console.Service.GetUser(ctx, userID)
if err != nil {
return nil, errs.Wrap(err)
}
return console.WithUser(ctx, user), nil
}
// Close closes all the subsystems in the Satellite system.
func (system *Satellite) Close() error {
return errs.Combine(
system.API.Close(),
system.Core.Close(),
system.Repairer.Close(),
system.Auditor.Close(),
system.Admin.Close(),
system.GC.Close(),
system.GCBF.Close(),
)
}
// Run runs all the subsystems in the Satellite system.
func (system *Satellite) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(system.Core.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.API.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Auditor.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Admin.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GC.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GCBF.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.RangedLoop.Run(ctx))
})
return group.Wait()
}
// PrivateAddr returns the private address from the Satellite system API.
func (system *Satellite) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
// newSatellites initializes satellites.
func (planet *Planet) newSatellites(ctx context.Context, count int, databases satellitedbtest.SatelliteDatabases) (_ []*Satellite, err error) {
defer mon.Task()(&ctx)(&err)
var satellites []*Satellite
for i := 0; i < count; i++ {
index := i
prefix := "satellite" + strconv.Itoa(index)
log := planet.log.Named(prefix)
var system *Satellite
var err error
pprof.Do(ctx, pprof.Labels("peer", prefix), func(ctx context.Context) {
system, err = planet.newSatellite(ctx, prefix, index, log, databases, planet.config.applicationName)
})
if err != nil {
return nil, errs.Wrap(err)
}
log.Debug("id=" + system.ID().String() + " addr=" + system.Addr())
satellites = append(satellites, system)
planet.peers = append(planet.peers, newClosablePeer(system))
}
return satellites, nil
}
func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int, log *zap.Logger, databases satellitedbtest.SatelliteDatabases, applicationName string) (_ *Satellite, err error) {
defer mon.Task()(&ctx)(&err)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, errs.Wrap(err)
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, errs.Wrap(err)
}
db, err := satellitedbtest.CreateMasterDB(ctx, log.Named("db"), planet.config.Name, "S", index, databases.MasterDB, applicationName)
if err != nil {
return nil, errs.Wrap(err)
}
if planet.config.Reconfigure.SatelliteDB != nil {
var newdb satellite.DB
newdb, err = planet.config.Reconfigure.SatelliteDB(log.Named("db"), index, db)
if err != nil {
return nil, errs.Combine(err, db.Close())
}
db = newdb
}
planet.databases = append(planet.databases, db)
redis, err := testredis.Mini(ctx)
if err != nil {
return nil, errs.Wrap(err)
}
encryptionKeys, err := orders.NewEncryptionKeys(orders.EncryptionKey{
ID: orders.EncryptionKeyID{1},
Key: storj.Key{1},
})
if err != nil {
return nil, errs.Wrap(err)
}
var config satellite.Config
cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config,
cfgstruct.UseTestDefaults(),
cfgstruct.ConfDir(storageDir),
cfgstruct.IdentityDir(storageDir),
cfgstruct.ConfigVar("TESTINTERVAL", defaultInterval.String()))
// TODO: these are almost certainly mistakenly set to the zero value
// in tests due to a prior mismatch between testplanet config and
// cfgstruct devDefaults. we need to make sure it's safe to remove
// these lines and then remove them.
config.Debug.Control = false
config.Reputation.AuditHistory.OfflineDQEnabled = false
config.Server.Config.Extensions.Revocation = false
config.Orders.OrdersSemaphoreSize = 0
config.Checker.NodeFailureRate = 0
config.Audit.MaxRetriesStatDB = 0
config.GarbageCollection.RetainSendTimeout = 0
config.ExpiredDeletion.ListLimit = 0
config.Tally.SaveRollupBatchSize = 0
config.Tally.ReadRollupBatchSize = 0
config.Rollup.DeleteTallies = false
config.Payments.BonusRate = 0
config.Payments.NodeEgressBandwidthPrice = 0
config.Payments.NodeRepairBandwidthPrice = 0
config.Payments.NodeAuditBandwidthPrice = 0
config.Payments.NodeDiskSpacePrice = 0
config.Identity.CertPath = ""
config.Identity.KeyPath = ""
config.Metainfo.DatabaseURL = ""
config.Console.ContactInfoURL = ""
config.Console.FrameAncestors = ""
config.Console.LetUsKnowURL = ""
config.Console.SEO = ""
config.Console.SatelliteOperator = ""
config.Console.TermsAndConditionsURL = ""
config.Console.GeneralRequestURL = ""
config.Console.ProjectLimitsIncreaseRequestURL = ""
config.Console.GatewayCredentialsRequestURL = ""
config.Console.DocumentationURL = ""
config.Console.LinksharingURL = ""
config.Console.PathwayOverviewEnabled = false
config.Compensation.Rates.AtRestGBHours = compensation.Rate{}
config.Compensation.Rates.GetTB = compensation.Rate{}
config.Compensation.Rates.GetRepairTB = compensation.Rate{}
config.Compensation.Rates.GetAuditTB = compensation.Rate{}
config.Compensation.WithheldPercents = nil
config.Compensation.DisposePercent = 0
config.ProjectLimit.CacheCapacity = 0
config.ProjectLimit.CacheExpiration = 0
// Actual testplanet-specific configuration
config.Server.Address = planet.NewListenAddress()
config.Server.PrivateAddress = planet.NewListenAddress()
config.Admin.Address = planet.NewListenAddress()
config.Console.Address = planet.NewListenAddress()
config.Server.Config.PeerCAWhitelistPath = planet.whitelistPath
config.Server.Config.UsePeerCAWhitelist = true
config.Version = planet.NewVersionConfig()
config.Metainfo.RS.Min = atLeastOne(planet.config.StorageNodeCount * 1 / 5)
config.Metainfo.RS.Repair = atLeastOne(planet.config.StorageNodeCount * 2 / 5)
config.Metainfo.RS.Success = atLeastOne(planet.config.StorageNodeCount * 3 / 5)
config.Metainfo.RS.Total = atLeastOne(planet.config.StorageNodeCount * 4 / 5)
config.Orders.EncryptionKeys = *encryptionKeys
config.LiveAccounting.StorageBackend = "redis://" + redis.Addr() + "?db=0"
config.Mail.TemplatePath = filepath.Join(developmentRoot, "web/satellite/static/emails")
config.Console.StaticDir = filepath.Join(developmentRoot, "web/satellite")
config.Payments.Storjscan.DisableLoop = true
if planet.config.Reconfigure.Satellite != nil {
planet.config.Reconfigure.Satellite(log, index, &config)
}
metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB, metabase.Config{
ApplicationName: "satellite-testplanet",
MinPartSize: config.Metainfo.MinPartSize,
MaxNumberOfParts: config.Metainfo.MaxNumberOfParts,
ServerSideCopy: config.Metainfo.ServerSideCopy,
})
if err != nil {
return nil, errs.Wrap(err)
}
if planet.config.Reconfigure.SatelliteMetabaseDB != nil {
var newMetabaseDB *metabase.DB
newMetabaseDB, err = planet.config.Reconfigure.SatelliteMetabaseDB(log.Named("metabase"), index, metabaseDB)
if err != nil {
return nil, errs.Combine(err, metabaseDB.Close())
}
metabaseDB = newMetabaseDB
}
planet.databases = append(planet.databases, metabaseDB)
versionInfo := planet.NewVersionInfo()
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccounting, err := live.OpenCache(ctx, log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
config.Payments.Provider = "mock"
config.Payments.MockProvider = stripe.NewStripeMock(db.StripeCoinPayments().Customers(), db.Console().Users())
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, versionInfo, &config, nil)
if err != nil {
return nil, errs.Wrap(err)
}
if planet.config.LastNetFunc != nil {
peer.Overlay.Service.LastNetFunc = planet.config.LastNetFunc
}
err = db.Testing().TestMigrateToLatest(ctx)
if err != nil {
return nil, errs.Wrap(err)
}
err = metabaseDB.TestMigrateToLatest(ctx)
if err != nil {
return nil, errs.Wrap(err)
}
api, err := planet.newAPI(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
adminPeer, err := planet.newAdmin(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
repairerPeer, err := planet.newRepairer(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
auditorPeer, err := planet.newAuditor(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
gcBFPeer, err := planet.newGarbageCollectionBF(ctx, index, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
rangedLoopPeer, err := planet.newRangedLoop(ctx, index, db, metabaseDB, config)
if err != nil {
return nil, errs.Wrap(err)
}
if config.EmailReminders.Enable {
peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/")
}
return createNewSystem(prefix, log, config, peer, api, repairerPeer, auditorPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many processes.
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, auditorPeer *satellite.Auditor, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite {
system := &Satellite{
Name: name,
Config: config,
Core: peer,
API: api,
Repairer: repairerPeer,
Auditor: auditorPeer,
Admin: adminPeer,
GC: gcPeer,
GCBF: gcBFPeer,
RangedLoop: rangedLoopPeer,
}
system.Log = log
system.Identity = peer.Identity
system.DB = api.DB
system.Dialer = api.Dialer
system.Contact.Service = api.Contact.Service
system.Contact.Endpoint = api.Contact.Endpoint
system.Overlay.DB = api.Overlay.DB
system.Overlay.Service = api.Overlay.Service
system.Overlay.OfflineNodeEmails = peer.Overlay.OfflineNodeEmails
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes
system.NodeEvents.DB = peer.NodeEvents.DB
system.NodeEvents.Notifier = peer.NodeEvents.Notifier
system.NodeEvents.Chore = peer.NodeEvents.Chore
system.Reputation.Service = peer.Reputation.Service
// system.Metainfo.Metabase = api.Metainfo.Metabase
system.Metainfo.Endpoint = api.Metainfo.Endpoint
// system.Metainfo.SegmentLoop = peer.Metainfo.SegmentLoop
system.Userinfo.Endpoint = api.Userinfo.Endpoint
system.Metabase.DB = api.Metainfo.Metabase
system.Orders.DB = api.Orders.DB
system.Orders.Endpoint = api.Orders.Endpoint
system.Orders.Service = api.Orders.Service
system.Orders.Chore = api.Orders.Chore
system.Repair.Repairer = repairerPeer.Repairer
system.Audit.VerifyQueue = auditorPeer.Audit.VerifyQueue
system.Audit.ReverifyQueue = auditorPeer.Audit.ReverifyQueue
system.Audit.Worker = auditorPeer.Audit.Worker
system.Audit.ReverifyWorker = auditorPeer.Audit.ReverifyWorker
system.Audit.Verifier = auditorPeer.Audit.Verifier
system.Audit.Reverifier = auditorPeer.Audit.Reverifier
system.Audit.Reporter = auditorPeer.Audit.Reporter
system.Audit.ContainmentSyncChore = peer.Audit.ContainmentSyncChore
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
system.Accounting.Tally = peer.Accounting.Tally
system.Accounting.Rollup = peer.Accounting.Rollup
system.Accounting.ProjectUsage = api.Accounting.ProjectUsage
system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore
system.Accounting.RollupArchive = peer.Accounting.RollupArchiveChore
system.LiveAccounting = peer.LiveAccounting
system.ProjectLimits.Cache = api.ProjectLimits.Cache
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
return system
}
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.API, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-api" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccounting, err := live.OpenCache(ctx, log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
}
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Admin, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-admin" + strconv.Itoa(index)
log := planet.log.Named(prefix)
return satellite.NewAdmin(log, identity, db, metabaseDB, versionInfo, &config, nil)
}
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Repairer, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-repairer" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), versionInfo, &config, nil)
}
func (planet *Planet) newAuditor(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Auditor, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-auditor" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewAuditor(log, identity, metabaseDB, revocationDB, db.VerifyQueue(), db.ReverifyQueue(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), versionInfo, &config, nil)
}
type rollupsWriteCacheCloser struct {
*orders.RollupsWriteCache
}
func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollection, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-gc" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}
func (planet *Planet) newGarbageCollectionBF(ctx context.Context, index int, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollectionBF, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-gc-bf" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollectionBF(log, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}
func (planet *Planet) newRangedLoop(ctx context.Context, index int, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config) (_ *satellite.RangedLoop, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-ranged-loop" + strconv.Itoa(index)
log := planet.log.Named(prefix)
return satellite.NewRangedLoop(log, db, metabaseDB, &config, nil)
}
// atLeastOne returns 1 if value < 1, or value otherwise.
func atLeastOne(value int) int {
if value < 1 {
return 1
}
return value
}