78c6d5bb32
this commit introduces the reported_serials table. its purpose is to allow for blind writes into it as nodes report in so that we have minimal contention. in order to continue to accurately account for used bandwidth, though, we cannot immediately add the settled amount. if we did, we would have to give up on blind writes. the table's primary key is structured precisely so that we can quickly find expired orders and so that we maximally benefit from rocksdb path prefix compression. we do this by rounding the expires at time forward to the next day, effectively giving us storagenode petnames for free. and since there's no secondary index or foreign key constraints, this design should use significantly less space than the current used_serials table while also reducing contention. after inserting the orders into the table, we have a chore that periodically consumes all of the expired orders in it and inserts them into the existing rollups tables. this is as if we changed the nodes to report as the order expired rather than as soon as possible, so the belief in correctness of the refactor is higher. since we are able to process large batches of orders (typically a day's worth), we can use the code to maximally batch inserts into the rollup tables to make inserts as friendly as possible to cockroach. Change-Id: I25d609ca2679b8331979184f16c6d46d4f74c1a6
520 lines
14 KiB
Go
520 lines
14 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellite
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/identity"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/peertls/extensions"
|
|
"storj.io/common/peertls/tlsopts"
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/signing"
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/private/version"
|
|
version_checker "storj.io/storj/private/version/checker"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/accounting/reportedrollup"
|
|
"storj.io/storj/satellite/accounting/rollup"
|
|
"storj.io/storj/satellite/accounting/tally"
|
|
"storj.io/storj/satellite/audit"
|
|
"storj.io/storj/satellite/contact"
|
|
"storj.io/storj/satellite/dbcleanup"
|
|
"storj.io/storj/satellite/downtime"
|
|
"storj.io/storj/satellite/gc"
|
|
"storj.io/storj/satellite/gracefulexit"
|
|
"storj.io/storj/satellite/metainfo"
|
|
"storj.io/storj/satellite/metrics"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/payments"
|
|
"storj.io/storj/satellite/payments/mockpayments"
|
|
"storj.io/storj/satellite/payments/stripecoinpayments"
|
|
"storj.io/storj/satellite/repair/checker"
|
|
"storj.io/storj/satellite/repair/repairer"
|
|
)
|
|
|
|
// Core is the satellite core process that runs chores
|
|
//
|
|
// architecture: Peer
|
|
type Core struct {
|
|
// core dependencies
|
|
Log *zap.Logger
|
|
Identity *identity.FullIdentity
|
|
DB DB
|
|
|
|
Dialer rpc.Dialer
|
|
|
|
Version *version_checker.Service
|
|
|
|
// services and endpoints
|
|
Contact struct {
|
|
Service *contact.Service
|
|
}
|
|
|
|
Overlay struct {
|
|
DB overlay.DB
|
|
Service *overlay.Service
|
|
}
|
|
|
|
Metainfo struct {
|
|
Database metainfo.PointerDB // TODO: move into pointerDB
|
|
Service *metainfo.Service
|
|
Loop *metainfo.Loop
|
|
}
|
|
|
|
Orders struct {
|
|
DB orders.DB
|
|
Service *orders.Service
|
|
Chore *orders.Chore
|
|
}
|
|
|
|
Repair struct {
|
|
Checker *checker.Checker
|
|
Repairer *repairer.Service
|
|
}
|
|
Audit struct {
|
|
Queue *audit.Queue
|
|
Worker *audit.Worker
|
|
Chore *audit.Chore
|
|
Verifier *audit.Verifier
|
|
Reporter *audit.Reporter
|
|
}
|
|
|
|
GarbageCollection struct {
|
|
Service *gc.Service
|
|
}
|
|
|
|
DBCleanup struct {
|
|
Chore *dbcleanup.Chore
|
|
}
|
|
|
|
Accounting struct {
|
|
Tally *tally.Service
|
|
Rollup *rollup.Service
|
|
ProjectUsage *accounting.Service
|
|
ReportedRollupChore *reportedrollup.Chore
|
|
}
|
|
|
|
LiveAccounting struct {
|
|
Cache accounting.Cache
|
|
}
|
|
|
|
Payments struct {
|
|
Accounts payments.Accounts
|
|
Chore *stripecoinpayments.Chore
|
|
}
|
|
|
|
GracefulExit struct {
|
|
Chore *gracefulexit.Chore
|
|
}
|
|
|
|
Metrics struct {
|
|
Chore *metrics.Chore
|
|
}
|
|
|
|
DowntimeTracking struct {
|
|
DetectionChore *downtime.DetectionChore
|
|
EstimationChore *downtime.EstimationChore
|
|
Service *downtime.Service
|
|
}
|
|
}
|
|
|
|
// New creates a new satellite
|
|
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Core, error) {
|
|
peer := &Core{
|
|
Log: log,
|
|
Identity: full,
|
|
DB: db,
|
|
}
|
|
|
|
var err error
|
|
|
|
{ // setup version control
|
|
if !versionInfo.IsZero() {
|
|
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
|
|
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
|
|
}
|
|
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
|
|
}
|
|
|
|
{ // setup listener and server
|
|
log.Debug("Starting listener and server")
|
|
sc := config.Server
|
|
|
|
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
|
|
if err != nil {
|
|
return nil, errs.Combine(err, peer.Close())
|
|
}
|
|
|
|
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
|
|
}
|
|
|
|
{ // setup contact service
|
|
pbVersion, err := versionInfo.Proto()
|
|
if err != nil {
|
|
return nil, errs.Combine(err, peer.Close())
|
|
}
|
|
|
|
self := &overlay.NodeDossier{
|
|
Node: pb.Node{
|
|
Id: peer.ID(),
|
|
Address: &pb.NodeAddress{
|
|
Address: config.Contact.ExternalAddress,
|
|
},
|
|
},
|
|
Type: pb.NodeType_SATELLITE,
|
|
Version: *pbVersion,
|
|
}
|
|
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer)
|
|
}
|
|
|
|
{ // setup overlay
|
|
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
|
|
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
|
|
}
|
|
|
|
{ // setup live accounting
|
|
peer.LiveAccounting.Cache = liveAccounting
|
|
}
|
|
|
|
{ // setup accounting project usage
|
|
peer.Accounting.ProjectUsage = accounting.NewService(
|
|
peer.DB.ProjectAccounting(),
|
|
peer.LiveAccounting.Cache,
|
|
config.Rollup.MaxAlphaUsage,
|
|
)
|
|
}
|
|
|
|
{ // setup orders
|
|
ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize)
|
|
peer.Orders.DB = ordersWriteCache
|
|
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
|
|
peer.Orders.Service = orders.NewService(
|
|
peer.Log.Named("orders:service"),
|
|
signing.SignerFromFullIdentity(peer.Identity),
|
|
peer.Overlay.Service,
|
|
peer.Orders.DB,
|
|
config.Orders.Expiration,
|
|
&pb.NodeAddress{
|
|
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
|
Address: config.Contact.ExternalAddress,
|
|
},
|
|
config.Repairer.MaxExcessRateOptimalThreshold,
|
|
)
|
|
}
|
|
|
|
{ // setup metainfo
|
|
peer.Metainfo.Database = pointerDB // for logging: storelogger.New(peer.Log.Named("pdb"), db)
|
|
peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"),
|
|
peer.Metainfo.Database,
|
|
peer.DB.Buckets(),
|
|
)
|
|
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
|
|
}
|
|
|
|
{ // setup datarepair
|
|
// TODO: simplify argument list somehow
|
|
peer.Repair.Checker = checker.NewChecker(
|
|
peer.Log.Named("checker"),
|
|
peer.DB.RepairQueue(),
|
|
peer.DB.Irreparable(),
|
|
peer.Metainfo.Service,
|
|
peer.Metainfo.Loop,
|
|
peer.Overlay.Service,
|
|
config.Checker)
|
|
|
|
segmentRepairer := repairer.NewSegmentRepairer(
|
|
log.Named("repairer"),
|
|
peer.Metainfo.Service,
|
|
peer.Orders.Service,
|
|
peer.Overlay.Service,
|
|
peer.Dialer,
|
|
config.Repairer.Timeout,
|
|
config.Repairer.MaxExcessRateOptimalThreshold,
|
|
config.Checker.RepairOverride,
|
|
config.Repairer.DownloadTimeout,
|
|
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
|
|
)
|
|
|
|
peer.Repair.Repairer = repairer.NewService(
|
|
peer.Log.Named("repairer"),
|
|
peer.DB.RepairQueue(),
|
|
&config.Repairer,
|
|
segmentRepairer,
|
|
)
|
|
}
|
|
|
|
{ // setup audit
|
|
config := config.Audit
|
|
|
|
peer.Audit.Queue = &audit.Queue{}
|
|
|
|
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
|
|
peer.Metainfo.Service,
|
|
peer.Dialer,
|
|
peer.Overlay.Service,
|
|
peer.DB.Containment(),
|
|
peer.Orders.Service,
|
|
peer.Identity,
|
|
config.MinBytesPerSecond,
|
|
config.MinDownloadTimeout,
|
|
)
|
|
|
|
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
|
|
peer.Overlay.Service,
|
|
peer.DB.Containment(),
|
|
config.MaxRetriesStatDB,
|
|
int32(config.MaxReverifyCount),
|
|
)
|
|
|
|
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit worker"),
|
|
peer.Audit.Queue,
|
|
peer.Audit.Verifier,
|
|
peer.Audit.Reporter,
|
|
config,
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, errs.Combine(err, peer.Close())
|
|
}
|
|
|
|
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit chore"),
|
|
peer.Audit.Queue,
|
|
peer.Metainfo.Loop,
|
|
config,
|
|
)
|
|
}
|
|
|
|
{ // setup garbage collection
|
|
peer.GarbageCollection.Service = gc.NewService(
|
|
peer.Log.Named("garbage collection"),
|
|
config.GarbageCollection,
|
|
peer.Dialer,
|
|
peer.Overlay.DB,
|
|
peer.Metainfo.Loop,
|
|
)
|
|
}
|
|
|
|
{ // setup db cleanup
|
|
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
|
|
}
|
|
|
|
{ // setup accounting
|
|
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
|
|
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
|
|
peer.Accounting.ReportedRollupChore = reportedrollup.NewChore(peer.Log.Named("reportedrollup"), peer.DB.Orders(), config.ReportedRollup)
|
|
}
|
|
|
|
// TODO: remove in future, should be in API
|
|
{ // setup payments
|
|
pc := config.Payments
|
|
|
|
switch pc.Provider {
|
|
default:
|
|
peer.Payments.Accounts = mockpayments.Accounts()
|
|
case "stripecoinpayments":
|
|
service := stripecoinpayments.NewService(
|
|
peer.Log.Named("payments.stripe:service"),
|
|
pc.StripeCoinPayments,
|
|
peer.DB.StripeCoinPayments(),
|
|
peer.DB.Console().Projects(),
|
|
peer.DB.ProjectAccounting(),
|
|
pc.PerObjectPrice,
|
|
pc.EgressPrice,
|
|
pc.TbhPrice)
|
|
|
|
peer.Payments.Accounts = service.Accounts()
|
|
|
|
peer.Payments.Chore = stripecoinpayments.NewChore(
|
|
peer.Log.Named("payments.stripe:clearing"),
|
|
service,
|
|
pc.StripeCoinPayments.TransactionUpdateInterval,
|
|
pc.StripeCoinPayments.AccountBalanceUpdateInterval,
|
|
)
|
|
}
|
|
}
|
|
|
|
{ // setup graceful exit
|
|
if config.GracefulExit.Enabled {
|
|
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
|
|
} else {
|
|
peer.Log.Named("gracefulexit").Info("disabled")
|
|
}
|
|
}
|
|
|
|
{ // setup metrics service
|
|
peer.Metrics.Chore = metrics.NewChore(
|
|
peer.Log.Named("metrics"),
|
|
config.Metrics,
|
|
peer.Metainfo.Loop,
|
|
)
|
|
}
|
|
|
|
{ // setup downtime tracking
|
|
peer.DowntimeTracking.Service = downtime.NewService(peer.Log.Named("downtime"), peer.Overlay.Service, peer.Contact.Service)
|
|
|
|
peer.DowntimeTracking.DetectionChore = downtime.NewDetectionChore(
|
|
peer.Log.Named("downtime:detection"),
|
|
config.Downtime,
|
|
peer.Overlay.Service,
|
|
peer.DowntimeTracking.Service,
|
|
peer.DB.DowntimeTracking(),
|
|
)
|
|
|
|
peer.DowntimeTracking.EstimationChore = downtime.NewEstimationChore(
|
|
peer.Log.Named("downtime:estimation"),
|
|
config.Downtime,
|
|
peer.Overlay.Service,
|
|
peer.DowntimeTracking.Service,
|
|
peer.DB.DowntimeTracking(),
|
|
)
|
|
}
|
|
|
|
return peer, nil
|
|
}
|
|
|
|
// Run runs satellite until it's either closed or it errors.
|
|
func (peer *Core) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Metainfo.Loop.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Accounting.ReportedRollupChore.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Audit.Worker.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Audit.Chore.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
|
|
})
|
|
if peer.GracefulExit.Chore != nil {
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
|
|
})
|
|
}
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx))
|
|
})
|
|
if peer.Payments.Chore != nil {
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Payments.Chore.Run(ctx))
|
|
})
|
|
}
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.DowntimeTracking.DetectionChore.Run(ctx))
|
|
})
|
|
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.DowntimeTracking.EstimationChore.Run(ctx))
|
|
})
|
|
group.Go(func() error {
|
|
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
|
|
})
|
|
|
|
return group.Wait()
|
|
}
|
|
|
|
// Close closes all the resources.
|
|
func (peer *Core) Close() error {
|
|
var errlist errs.Group
|
|
|
|
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
|
|
|
|
// close servers, to avoid new connections to closing subsystems
|
|
if peer.DowntimeTracking.EstimationChore != nil {
|
|
errlist.Add(peer.DowntimeTracking.EstimationChore.Close())
|
|
}
|
|
|
|
if peer.DowntimeTracking.DetectionChore != nil {
|
|
errlist.Add(peer.DowntimeTracking.DetectionChore.Close())
|
|
}
|
|
|
|
if peer.Metrics.Chore != nil {
|
|
errlist.Add(peer.Metrics.Chore.Close())
|
|
}
|
|
|
|
if peer.GracefulExit.Chore != nil {
|
|
errlist.Add(peer.GracefulExit.Chore.Close())
|
|
}
|
|
|
|
// close services in reverse initialization order
|
|
|
|
if peer.Audit.Chore != nil {
|
|
errlist.Add(peer.Audit.Chore.Close())
|
|
}
|
|
if peer.Audit.Worker != nil {
|
|
errlist.Add(peer.Audit.Worker.Close())
|
|
}
|
|
|
|
if peer.Accounting.Rollup != nil {
|
|
errlist.Add(peer.Accounting.ReportedRollupChore.Close())
|
|
}
|
|
if peer.Accounting.Rollup != nil {
|
|
errlist.Add(peer.Accounting.Rollup.Close())
|
|
}
|
|
if peer.Accounting.Tally != nil {
|
|
errlist.Add(peer.Accounting.Tally.Close())
|
|
}
|
|
|
|
if peer.DBCleanup.Chore != nil {
|
|
errlist.Add(peer.DBCleanup.Chore.Close())
|
|
}
|
|
if peer.Repair.Repairer != nil {
|
|
errlist.Add(peer.Repair.Repairer.Close())
|
|
}
|
|
if peer.Repair.Checker != nil {
|
|
errlist.Add(peer.Repair.Checker.Close())
|
|
}
|
|
|
|
if peer.Overlay.Service != nil {
|
|
errlist.Add(peer.Overlay.Service.Close())
|
|
}
|
|
if peer.Contact.Service != nil {
|
|
errlist.Add(peer.Contact.Service.Close())
|
|
}
|
|
if peer.Metainfo.Loop != nil {
|
|
errlist.Add(peer.Metainfo.Loop.Close())
|
|
}
|
|
if peer.Orders.Chore.Loop != nil {
|
|
errlist.Add(peer.Orders.Chore.Close())
|
|
}
|
|
|
|
return errlist.Err()
|
|
}
|
|
|
|
// ID returns the peer ID.
|
|
func (peer *Core) ID() storj.NodeID { return peer.Identity.ID }
|