satellite: cleanup orders dependencies

Only API peer needs access to order DB (and rollups cache) because it's
only place where we are creating orders for PUT and GET operations. For
other peers like auditor and repairer we can set noop implementation to
reduce number of dependencies needed for them.

Change-Id: Ic32d1879f0b97ffc4516f401898e31e95ae892e4
This commit is contained in:
Michal Niewrzal 2023-03-07 10:29:25 +01:00 committed by Storj Robot
parent ffaf15a3b0
commit 0c177ef91f
8 changed files with 15 additions and 104 deletions

View File

@ -8,14 +8,12 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/errs2"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -60,11 +58,6 @@ func cmdAuditorRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewAuditor(
log,
identity,
@ -76,7 +69,6 @@ func cmdAuditorRun(cmd *cobra.Command, args []string) (err error) {
db.NodeEvents(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
process.AtomicLevel(cmd),

View File

@ -15,7 +15,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/pb"
"storj.io/common/uuid"
"storj.io/private/process"
@ -23,7 +22,6 @@ import (
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -76,11 +74,6 @@ func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewRepairer(
log,
identity,
@ -92,7 +85,6 @@ func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) {
db.NodeEvents(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
process.AtomicLevel(cmd),

View File

@ -21,7 +21,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/fpath"
"storj.io/common/lrucache"
"storj.io/common/pb"
@ -41,7 +40,6 @@ import (
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/satellitedb"
@ -473,12 +471,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, liveAccounting.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
if err != nil {
return err
}

View File

@ -8,14 +8,12 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/errs2"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -56,11 +54,6 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewRepairer(
log,
identity,
@ -72,7 +65,6 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
db.NodeEvents(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
process.AtomicLevel(cmd),

View File

@ -522,13 +522,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
}
planet.databases = append(planet.databases, liveAccounting)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
config.Payments.Provider = "mock"
config.Payments.MockProvider = stripecoinpayments.NewStripeMock(db.StripeCoinPayments().Customers(), db.Console().Users())
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, versionInfo, &config, nil)
if err != nil {
return nil, errs.Wrap(err)
}
@ -639,7 +636,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Orders.DB = api.Orders.DB
system.Orders.Endpoint = api.Orders.Endpoint
system.Orders.Service = peer.Orders.Service
system.Orders.Service = api.Orders.Service
system.Orders.Chore = api.Orders.Chore
system.Repair.Checker = peer.Repair.Checker
@ -725,10 +722,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
}
planet.databases = append(planet.databases, revocationDB)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), versionInfo, &config, nil)
}
func (planet *Planet) newAuditor(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Auditor, err error) {
@ -743,10 +737,7 @@ func (planet *Planet) newAuditor(ctx context.Context, index int, identity *ident
}
planet.databases = append(planet.databases, revocationDB)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewAuditor(log, identity, metabaseDB, revocationDB, db.VerifyQueue(), db.ReverifyQueue(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
return satellite.NewAuditor(log, identity, metabaseDB, revocationDB, db.VerifyQueue(), db.ReverifyQueue(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), versionInfo, &config, nil)
}
type rollupsWriteCacheCloser struct {

View File

@ -59,9 +59,7 @@ type Auditor struct {
Overlay *overlay.Service
Reputation *reputation.Service
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Audit struct {
@ -85,7 +83,6 @@ func NewAuditor(log *zap.Logger, full *identity.FullIdentity,
nodeEvents nodeevents.DB,
reputationdb reputation.DB,
containmentDB audit.Containment,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel,
) (*Auditor, error) {
peer := &Auditor{
@ -177,22 +174,15 @@ func NewAuditor(log *zap.Logger, full *identity.FullIdentity,
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
var err error
peer.Orders.Service, err = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
peer.Orders.DB,
// orders service needs DB only for handling
// PUT and GET actions which are not used by
// auditor so we can set noop implementation.
orders.NewNoopDB(),
config.Orders,
)
if err != nil {

View File

@ -18,7 +18,6 @@ import (
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/private/version"
@ -43,7 +42,6 @@ import (
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/offlinenodes"
"storj.io/storj/satellite/overlay/straynodes"
@ -104,12 +102,6 @@ type Core struct {
SegmentLoop *segmentloop.Service
}
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Reputation struct {
Service *reputation.Service
}
@ -166,8 +158,8 @@ type Core struct {
// New creates a new satellite.
func New(log *zap.Logger, full *identity.FullIdentity, db DB,
metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) {
liveAccounting accounting.Cache, versionInfo version.Info, config *Config,
atomicLogLevel *zap.AtomicLevel) (*Core, error) {
peer := &Core{
Log: log,
Identity: full,
@ -323,27 +315,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.LiveAccounting.Cache = liveAccounting
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
var err error
peer.Orders.Service, err = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.Orders.DB,
config.Orders,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup metainfo
peer.Metainfo.Metabase = metabaseDB

View File

@ -60,9 +60,7 @@ type Repairer struct {
Overlay *overlay.Service
Reputation *reputation.Service
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Audit struct {
@ -84,7 +82,6 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
nodeEvents nodeevents.DB,
reputationdb reputation.DB,
containmentDB audit.Containment,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel,
) (*Repairer, error) {
peer := &Repairer{
@ -176,22 +173,15 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
var err error
peer.Orders.Service, err = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
peer.Orders.DB,
// orders service needs DB only for handling
// PUT and GET actions which are not used by
// repairer so we can set noop implementation.
orders.NewNoopDB(),
config.Orders,
)
if err != nil {