storj/satellite/core.go
paul cannon d8733ddd40 satellite/satellitedb: stop using _gob columns
This sets the corresponding _numeric columns to be NOT NULL (it has been
verified manually that there are no more NULL _numeric values on any
known satellites, and it should be impossible with current code to get
new NULL values in the _numeric columns.

We can't drop the _gob columns immediately, as there will still be code
running that expects them, but once this version is deployed we can
finally drop them and be totally done with this crazy 5-step migration.

Change-Id: I518302528d972090d56b3eedc815656610ac8e73
2022-03-30 04:13:13 +00:00

553 lines
16 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"errors"
"net"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metabase/zombiedeletion"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/reputation"
)
// Core is the satellite core process that runs chores.
//
// architecture: Peer
type Core struct {
// core dependencies
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version struct {
Chore *version_checker.Chore
Service *version_checker.Service
}
Debug struct {
Listener net.Listener
Server *debug.Server
}
// services and endpoints
Overlay struct {
DB overlay.DB
Service *overlay.Service
DQStrayNodes *straynodes.Chore
}
Metainfo struct {
Metabase *metabase.DB
SegmentLoop *segmentloop.Service
}
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Reputation struct {
Service *reputation.Service
}
Repair struct {
Checker *checker.Checker
}
Audit struct {
Queues *audit.Queues
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
}
ExpiredDeletion struct {
Chore *expireddeletion.Chore
}
ZombieDeletion struct {
Chore *zombiedeletion.Chore
}
Accounting struct {
Tally *tally.Service
NodeTally *nodetally.Service
Rollup *rollup.Service
RollupArchiveChore *rolluparchive.Chore
ProjectBWCleanupChore *projectbwcleanup.Chore
}
LiveAccounting struct {
Cache accounting.Cache
}
Payments struct {
Accounts payments.Accounts
Chore *stripecoinpayments.Chore
}
GracefulExit struct {
Chore *gracefulexit.Chore
}
Metrics struct {
Chore *metrics.Chore
}
Buckets struct {
Service *buckets.Service
}
}
// New creates a new satellite.
func New(log *zap.Logger, full *identity.FullIdentity, db DB,
metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) {
peer := &Core{
Log: log,
Identity: full,
DB: db,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
{ // setup buckets service
peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
}
{ // setup debug
var err error
if config.Debug.Address != "" {
peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
if err != nil {
withoutStack := errors.New(err.Error())
peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
}
}
debugConfig := config.Debug
debugConfig.ControlTitle = "Core"
peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
peer.Servers.Add(lifecycle.Item{
Name: "debug",
Run: peer.Debug.Server.Run,
Close: peer.Debug.Server.Close,
})
}
var err error
{ // setup version control
peer.Log.Info("Version info",
zap.Stringer("Version", versionInfo.Version.Version),
zap.String("Commit Hash", versionInfo.CommitHash),
zap.Stringer("Build Timestamp", versionInfo.Timestamp),
zap.Bool("Release Build", versionInfo.Release),
)
peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Chore.Run,
})
}
{ // setup listener and server
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Service.Close,
})
if config.StrayNodes.EnableDQ {
peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.DB, config.StrayNodes)
peer.Services.Add(lifecycle.Item{
Name: "overlay:dq-stray-nodes",
Run: peer.Overlay.DQStrayNodes.Run,
Close: peer.Overlay.DQStrayNodes.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Overlay DQ Stray Nodes", peer.Overlay.DQStrayNodes.Loop))
}
}
{ // setup live accounting
peer.LiveAccounting.Cache = liveAccounting
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
var err error
peer.Orders.Service, err = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.Orders.DB,
peer.Buckets.Service,
config.Orders,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup metainfo
peer.Metainfo.Metabase = metabaseDB
peer.Metainfo.SegmentLoop = segmentloop.New(
peer.Log.Named("metainfo:segmentloop"),
config.Metainfo.SegmentLoop,
peer.Metainfo.Metabase,
)
peer.Services.Add(lifecycle.Item{
Name: "metainfo:segmentloop",
Run: peer.Metainfo.SegmentLoop.Run,
Close: peer.Metainfo.SegmentLoop.Close,
})
}
{ // setup datarepair
// TODO: simplify argument list somehow
peer.Repair.Checker = checker.NewChecker(
peer.Log.Named("repair:checker"),
peer.DB.RepairQueue(),
peer.Metainfo.Metabase,
peer.Metainfo.SegmentLoop,
peer.Overlay.Service,
config.Checker)
peer.Services.Add(lifecycle.Item{
Name: "repair:checker",
Run: peer.Repair.Checker.Run,
Close: peer.Repair.Checker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Repair Checker", peer.Repair.Checker.Loop))
}
{ // setup reputation
peer.Reputation.Service = reputation.NewService(log.Named("reputation:service"),
peer.Overlay.DB,
peer.DB.Reputation(),
config.Reputation,
)
peer.Services.Add(lifecycle.Item{
Name: "reputation",
Close: peer.Reputation.Service.Close,
})
}
{ // setup audit
// force tcp for now because audit is very sensitive to how errors
// are returned, and adding quic can cause problems
dialer := peer.Dialer
//lint:ignore SA1019 deprecated is fine here.
//nolint:staticcheck // deprecated is fine here.
dialer.Connector = rpc.NewDefaultTCPConnector(nil)
config := config.Audit
peer.Audit.Queues = audit.NewQueues()
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Metabase,
dialer,
peer.Overlay.Service,
peer.DB.Containment(),
peer.Orders.Service,
peer.Identity,
config.MinBytesPerSecond,
config.MinDownloadTimeout,
)
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
peer.Reputation.Service,
peer.DB.Containment(),
config.MaxRetriesStatDB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.Queues,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:worker",
Run: peer.Audit.Worker.Run,
Close: peer.Audit.Worker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Worker", peer.Audit.Worker.Loop))
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.Queues,
peer.Metainfo.SegmentLoop,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:chore",
Run: peer.Audit.Chore.Run,
Close: peer.Audit.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
}
{ // setup expired segment cleanup
peer.ExpiredDeletion.Chore = expireddeletion.NewChore(
peer.Log.Named("core-expired-deletion"),
config.ExpiredDeletion,
peer.Metainfo.Metabase,
)
peer.Services.Add(lifecycle.Item{
Name: "expireddeletion:chore",
Run: peer.ExpiredDeletion.Chore.Run,
Close: peer.ExpiredDeletion.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop))
}
{ // setup zombie objects cleanup
peer.ZombieDeletion.Chore = zombiedeletion.NewChore(
peer.Log.Named("core-zombie-deletion"),
config.ZombieDeletion,
peer.Metainfo.Metabase,
)
peer.Services.Add(lifecycle.Item{
Name: "zombiedeletion:chore",
Run: peer.ZombieDeletion.Chore.Run,
Close: peer.ZombieDeletion.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Zombie Objects Chore", peer.ZombieDeletion.Chore.Loop))
}
{ // setup accounting
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally)
peer.Services.Add(lifecycle.Item{
Name: "accounting:tally",
Run: peer.Accounting.Tally.Run,
Close: peer.Accounting.Tally.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop))
// storage nodes tally
peer.Accounting.NodeTally = nodetally.New(peer.Log.Named("accounting:nodetally"), peer.DB.StoragenodeAccounting(), peer.Metainfo.SegmentLoop, config.Tally.Interval)
peer.Services.Add(lifecycle.Item{
Name: "accounting:nodetally",
Run: peer.Accounting.NodeTally.Run,
Close: peer.Accounting.NodeTally.Close,
})
// Lets add 1 more day so we catch any off by one errors when deleting tallies
orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay)
peer.Services.Add(lifecycle.Item{
Name: "accounting:rollup",
Run: peer.Accounting.Rollup.Run,
Close: peer.Accounting.Rollup.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Rollup", peer.Accounting.Rollup.Loop))
peer.Accounting.ProjectBWCleanupChore = projectbwcleanup.NewChore(peer.Log.Named("accounting:chore"), peer.DB.ProjectAccounting(), config.ProjectBWCleanup)
peer.Services.Add(lifecycle.Item{
Name: "accounting:project-bw-rollup",
Run: peer.Accounting.ProjectBWCleanupChore.Run,
Close: peer.Accounting.ProjectBWCleanupChore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Project Bandwidth Rollup", peer.Accounting.ProjectBWCleanupChore.Loop))
if config.RollupArchive.Enabled {
peer.Accounting.RollupArchiveChore = rolluparchive.New(peer.Log.Named("accounting:rollup-archive"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), config.RollupArchive)
peer.Services.Add(lifecycle.Item{
Name: "accounting:rollup-archive",
Run: peer.Accounting.RollupArchiveChore.Run,
Close: peer.Accounting.RollupArchiveChore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Rollup Archive", peer.Accounting.RollupArchiveChore.Loop))
} else {
peer.Log.Named("rolluparchive").Info("disabled")
}
}
// TODO: remove in future, should be in API
{ // setup payments
pc := config.Payments
var stripeClient stripecoinpayments.StripeClient
switch pc.Provider {
default:
stripeClient = stripecoinpayments.NewStripeMock(
peer.ID(),
peer.DB.StripeCoinPayments().Customers(),
peer.DB.Console().Users(),
)
case "stripecoinpayments":
stripeClient = stripecoinpayments.NewStripeClient(log, pc.StripeCoinPayments)
}
service, err := stripecoinpayments.NewService(
peer.Log.Named("payments.stripe:service"),
stripeClient,
pc.StripeCoinPayments,
peer.DB.StripeCoinPayments(),
peer.DB.Console().Projects(),
peer.DB.ProjectAccounting(),
pc.StorageTBPrice,
pc.EgressTBPrice,
pc.SegmentPrice,
pc.BonusRate)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Payments.Accounts = service.Accounts()
peer.Payments.Chore = stripecoinpayments.NewChore(
peer.Log.Named("payments.stripe:clearing"),
service,
pc.StripeCoinPayments.TransactionUpdateInterval,
pc.StripeCoinPayments.AccountBalanceUpdateInterval,
)
peer.Services.Add(lifecycle.Item{
Name: "payments.stripe:service",
Run: peer.Payments.Chore.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Payments Stripe Transactions", peer.Payments.Chore.TransactionCycle),
debug.Cycle("Payments Stripe Account Balance", peer.Payments.Chore.AccountBalanceCycle),
)
}
{ // setup graceful exit
if config.GracefulExit.Enabled {
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit)
peer.Services.Add(lifecycle.Item{
Name: "gracefulexit",
Run: peer.GracefulExit.Chore.Run,
Close: peer.GracefulExit.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
} else {
peer.Log.Named("gracefulexit").Info("disabled")
}
}
{ // setup metrics service
peer.Metrics.Chore = metrics.NewChore(
peer.Log.Named("metrics"),
config.Metrics,
peer.Metainfo.SegmentLoop,
)
peer.Services.Add(lifecycle.Item{
Name: "metrics",
Run: peer.Metrics.Chore.Run,
Close: peer.Metrics.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Metrics", peer.Metrics.Chore.Loop))
}
return peer, nil
}
// Run runs satellite until it's either closed or it errors.
func (peer *Core) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *Core) Close() error {
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.
func (peer *Core) ID() storj.NodeID { return peer.Identity.ID }