satellite: change the Peer name to Core (#3472)

* change satellite.Peer name to Core

* change to Core in testplanet

* missed a few places

* keep shared stuff in peer.go to stay consistent with storj/docs
This commit is contained in:
Jess G 2019-11-04 11:01:02 -08:00 committed by GitHub
parent 281b8b6967
commit 5abb91afcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 453 additions and 429 deletions

View File

@ -222,7 +222,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
log := zap.L()
db, err := satellitedb.New(log.Named("db migration"), runCfg.Database)
if err != nil {
return errs.New("Error createing new master database connection for satellitedb migration: %+v", err)
return errs.New("Error creating new master database connection for satellitedb migration: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())

View File

@ -270,13 +270,13 @@ func newNetwork(flags *Flags) (*Processes, error) {
var satellites []*Process
for i := 0; i < flags.SatelliteCount; i++ {
process := processes.New(Info{
apiProcess := processes.New(Info{
Name: fmt.Sprintf("satellite/%d", i),
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
Address: net.JoinHostPort(host, port(satellitePeer, i, publicGRPC)),
})
satellites = append(satellites, process)
satellites = append(satellites, apiProcess)
consoleAuthToken := "secure_token"
@ -285,12 +285,12 @@ func newNetwork(flags *Flags) (*Processes, error) {
if redisAddress == "" {
redisAddress = redisServers[i].Address
redisPortBase = 0
process.WaitForStart(redisServers[i])
apiProcess.WaitForStart(redisServers[i])
}
process.Arguments = withCommon(process.Directory, Arguments{
apiProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"setup": {
"--identity-dir", process.Directory,
"--identity-dir", apiProcess.Directory,
"--console.address", net.JoinHostPort(host, port(satellitePeer, i, publicHTTP)),
"--console.static-dir", filepath.Join(storjRoot, "web/satellite/"),
// TODO: remove console.auth-token after vanguard release
@ -298,7 +298,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
"--marketing.base-url", "",
"--marketing.address", net.JoinHostPort(host, port(satellitePeer, i, privateHTTP)),
"--marketing.static-dir", filepath.Join(storjRoot, "web/marketing/"),
"--server.address", process.Address,
"--server.address", apiProcess.Address,
"--server.private-address", net.JoinHostPort(host, port(satellitePeer, i, privateGRPC)),
"--live-accounting.storage-backend", "redis://" + redisAddress + "?db=" + strconv.Itoa(redisPortBase),
@ -317,12 +317,12 @@ func newNetwork(flags *Flags) (*Processes, error) {
})
if flags.Postgres != "" {
process.Arguments["setup"] = append(process.Arguments["setup"],
apiProcess.Arguments["setup"] = append(apiProcess.Arguments["setup"],
"--database", pgutil.ConnstrWithSchema(flags.Postgres, fmt.Sprintf("satellite/%d", i)),
"--metainfo.database-url", pgutil.ConnstrWithSchema(flags.Postgres, fmt.Sprintf("satellite/%d/meta", i)),
)
}
process.ExecBefore["run"] = func(process *Process) error {
apiProcess.ExecBefore["run"] = func(process *Process) error {
return readConfigString(&process.Address, process.Directory, "server.address")
}
@ -331,7 +331,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
})
migrationProcess.Arguments = withCommon(process.Directory, Arguments{
migrationProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"run": {
"migration",
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugMigrationHTTP)),
@ -344,7 +344,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
Address: "",
})
coreProcess.Arguments = withCommon(process.Directory, Arguments{
coreProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"run": {
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
},
@ -356,7 +356,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
})
repairProcess.Arguments = withCommon(process.Directory, Arguments{
repairProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"run": {
"repair",
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugRepairerHTTP)),
@ -364,7 +364,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
})
repairProcess.WaitForExited(migrationProcess)
process.WaitForExited(migrationProcess)
apiProcess.WaitForExited(migrationProcess)
}
// Create gateways for each satellite

View File

@ -213,7 +213,7 @@ Data Science could use this approach to more nicely calculate statistics however
1. Implement [_estimating offline time_ part](#estimating-offline-time)<sup>1</sup>.
<sup>1</sup> These subtasks can be done in parallel.
1. Wire the new chore to the `satellite.Peer`.
1. Wire the new chore to the `satellite.Core`.
1. Remove the implementation of the current uptime disqualification.
- `satellite/satellitedb.Overlaycache.UpdateUptime`: Remove update disqualified field due to lower uptime reputation.
- `satellite/satellitedb.Overlaycache.populateUpdateNodeStats`: Remove update disqualified field due to lower uptime reputation.

View File

@ -55,7 +55,7 @@ import (
// SatelliteSystem contains all the processes needed to run a full Satellite setup
type SatelliteSystem struct {
Peer *satellite.Peer
Core *satellite.Core
API *satellite.API
Repairer *satellite.Repairer
@ -176,7 +176,7 @@ func (system *SatelliteSystem) URL() storj.NodeURL {
// Close closes all the subsystems in the Satellite system
func (system *SatelliteSystem) Close() error {
return errs.Combine(system.API.Close(), system.Peer.Close(), system.Repairer.Close())
return errs.Combine(system.API.Close(), system.Core.Close(), system.Repairer.Close())
}
// Run runs all the subsystems in the Satellite system
@ -184,7 +184,7 @@ func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(system.Peer.Run(ctx))
return errs2.IgnoreCanceled(system.Core.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.API.Run(ctx))
@ -435,9 +435,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many prrocesses.
func createNewSystem(log *zap.Logger, peer *satellite.Peer, api *satellite.API, repairerPeer *satellite.Repairer) *SatelliteSystem {
func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer) *SatelliteSystem {
system := &SatelliteSystem{
Peer: peer,
Core: peer,
API: api,
Repairer: repairerPeer,
}

432
satellite/core.go Normal file
View File

@ -0,0 +1,432 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version"
version_checker "storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/extensions"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/mockpayments"
"storj.io/storj/satellite/payments/paymentsconfig"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/repairer"
)
// Core is the satellite core process that runs chores
//
// architecture: Peer
type Core struct {
// core dependencies
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
Dialer rpc.Dialer
Version *version_checker.Service
// services and endpoints
Overlay struct {
DB overlay.DB
Service *overlay.Service
}
Metainfo struct {
Database metainfo.PointerDB // TODO: move into pointerDB
Service *metainfo.Service
Loop *metainfo.Loop
}
Orders struct {
Service *orders.Service
}
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
}
Audit struct {
Queue *audit.Queue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
}
GarbageCollection struct {
Service *gc.Service
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
Accounting struct {
Tally *tally.Service
Rollup *rollup.Service
ProjectUsage *accounting.ProjectUsage
}
LiveAccounting struct {
Cache accounting.Cache
}
Payments struct {
Accounts payments.Accounts
Clearing payments.Clearing
}
GracefulExit struct {
Chore *gracefulexit.Chore
}
Metrics struct {
Chore *metrics.Chore
}
}
// New creates a new satellite
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Core, error) {
peer := &Core{
Log: log,
Identity: full,
DB: db,
}
var err error
{ // setup version control
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
}
{ // setup listener and server
log.Debug("Starting listener and server")
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup overlay
log.Debug("Starting overlay")
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
}
{ // setup live accounting
log.Debug("Setting up live accounting")
peer.LiveAccounting.Cache = liveAccounting
}
{ // setup accounting project usage
log.Debug("Setting up accounting project usage")
peer.Accounting.ProjectUsage = accounting.NewProjectUsage(
peer.DB.ProjectAccounting(),
peer.LiveAccounting.Cache,
config.Rollup.MaxAlphaUsage,
)
}
{ // setup orders
log.Debug("Setting up orders")
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.DB.Orders(),
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.Contact.ExternalAddress,
},
config.Repairer.MaxExcessRateOptimalThreshold,
)
}
{ // setup metainfo
log.Debug("Setting up metainfo")
peer.Metainfo.Database = pointerDB // for logging: storelogger.New(peer.Log.Named("pdb"), db)
peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"),
peer.Metainfo.Database,
peer.DB.Buckets(),
)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
}
{ // setup datarepair
log.Debug("Setting up datarepair")
// TODO: simplify argument list somehow
peer.Repair.Checker = checker.NewChecker(
peer.Log.Named("checker"),
peer.DB.RepairQueue(),
peer.DB.Irreparable(),
peer.Metainfo.Service,
peer.Metainfo.Loop,
peer.Overlay.Service,
config.Checker)
segmentRepairer := repairer.NewSegmentRepairer(
log.Named("repairer"),
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
peer.Dialer,
config.Repairer.Timeout,
config.Repairer.MaxExcessRateOptimalThreshold,
config.Checker.RepairOverride,
config.Repairer.DownloadTimeout,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
)
peer.Repair.Repairer = repairer.NewService(
peer.Log.Named("repairer"),
peer.DB.RepairQueue(),
&config.Repairer,
segmentRepairer,
)
}
{ // setup audit
log.Debug("Setting up audits")
config := config.Audit
peer.Audit.Queue = &audit.Queue{}
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Service,
peer.Dialer,
peer.Overlay.Service,
peer.DB.Containment(),
peer.Orders.Service,
peer.Identity,
config.MinBytesPerSecond,
config.MinDownloadTimeout,
)
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
peer.Overlay.Service,
peer.DB.Containment(),
config.MaxRetriesStatDB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit worker"),
peer.Audit.Queue,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit chore"),
peer.Audit.Queue,
peer.Metainfo.Loop,
config,
)
}
{ // setup garbage collection
log.Debug("Setting up garbage collection")
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
}
{ // setup db cleanup
log.Debug("Setting up db cleanup")
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
}
{ // setup accounting
log.Debug("Setting up accounting")
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
}
// TODO: remove in future, should be in API
{ // setup payments
config := paymentsconfig.Config{}
switch config.Provider {
default:
peer.Payments.Accounts = mockpayments.Accounts()
case "stripecoinpayments":
service := stripecoinpayments.NewService(
peer.Log.Named("stripecoinpayments service"),
config.StripeCoinPayments,
peer.DB.Customers(),
peer.DB.CoinpaymentsTransactions())
peer.Payments.Accounts = service.Accounts()
peer.Payments.Clearing = stripecoinpayments.NewChore(
peer.Log.Named("stripecoinpayments clearing loop"),
service,
config.StripeCoinPayments.TransactionUpdateInterval,
config.StripeCoinPayments.AccountBalanceUpdateInterval)
}
}
{ // setup graceful exit
if config.GracefulExit.Enabled {
log.Debug("Setting up graceful exit")
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
}
}
{ // setup metrics service
peer.Metrics.Chore = metrics.NewChore(
peer.Log.Named("metrics"),
config.Metrics,
peer.Metainfo.Loop,
)
}
return peer, nil
}
// Run runs satellite until it's either closed or it errors.
func (peer *Core) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metainfo.Loop.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Worker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
})
if peer.GracefulExit.Chore != nil {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx))
})
return group.Wait()
}
// Close closes all the resources.
func (peer *Core) Close() error {
var errlist errs.Group
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
// close servers, to avoid new connections to closing subsystems
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}
if peer.GracefulExit.Chore != nil {
errlist.Add(peer.GracefulExit.Chore.Close())
}
// close services in reverse initialization order
if peer.Audit.Chore != nil {
errlist.Add(peer.Audit.Chore.Close())
}
if peer.Audit.Worker != nil {
errlist.Add(peer.Audit.Worker.Close())
}
if peer.Accounting.Rollup != nil {
errlist.Add(peer.Accounting.Rollup.Close())
}
if peer.Accounting.Tally != nil {
errlist.Add(peer.Accounting.Tally.Close())
}
if peer.DBCleanup.Chore != nil {
errlist.Add(peer.DBCleanup.Chore.Close())
}
if peer.Repair.Repairer != nil {
errlist.Add(peer.Repair.Repairer.Close())
}
if peer.Repair.Checker != nil {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
if peer.Metainfo.Loop != nil {
errlist.Add(peer.Metainfo.Loop.Close())
}
return errlist.Err()
}
// ID returns the peer ID.
func (peer *Core) ID() storj.NodeID { return peer.Identity.ID }

View File

@ -4,24 +4,11 @@
package satellite
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version"
version_checker "storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/extensions"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/server"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/accounting/rollup"
@ -40,9 +27,6 @@ import (
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/mockpayments"
"storj.io/storj/satellite/payments/paymentsconfig"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/irreparable"
@ -135,395 +119,3 @@ type Config struct {
Metrics metrics.Config
}
// Peer is the satellite
//
// architecture: Peer
type Peer struct {
// core dependencies
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
Dialer rpc.Dialer
Version *version_checker.Service
// services and endpoints
Overlay struct {
DB overlay.DB
Service *overlay.Service
}
Metainfo struct {
Database metainfo.PointerDB // TODO: move into pointerDB
Service *metainfo.Service
Loop *metainfo.Loop
}
Orders struct {
Service *orders.Service
}
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
}
Audit struct {
Queue *audit.Queue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
}
GarbageCollection struct {
Service *gc.Service
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
Accounting struct {
Tally *tally.Service
Rollup *rollup.Service
ProjectUsage *accounting.ProjectUsage
}
LiveAccounting struct {
Cache accounting.Cache
}
Payments struct {
Accounts payments.Accounts
Clearing payments.Clearing
}
GracefulExit struct {
Chore *gracefulexit.Chore
}
Metrics struct {
Chore *metrics.Chore
}
}
// New creates a new satellite
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Peer, error) {
peer := &Peer{
Log: log,
Identity: full,
DB: db,
}
var err error
{ // setup version control
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
}
{ // setup listener and server
log.Debug("Starting listener and server")
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup overlay
log.Debug("Starting overlay")
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
}
{ // setup live accounting
log.Debug("Setting up live accounting")
peer.LiveAccounting.Cache = liveAccounting
}
{ // setup accounting project usage
log.Debug("Setting up accounting project usage")
peer.Accounting.ProjectUsage = accounting.NewProjectUsage(
peer.DB.ProjectAccounting(),
peer.LiveAccounting.Cache,
config.Rollup.MaxAlphaUsage,
)
}
{ // setup orders
log.Debug("Setting up orders")
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.DB.Orders(),
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.Contact.ExternalAddress,
},
config.Repairer.MaxExcessRateOptimalThreshold,
)
}
{ // setup metainfo
log.Debug("Setting up metainfo")
peer.Metainfo.Database = pointerDB // for logging: storelogger.New(peer.Log.Named("pdb"), db)
peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"),
peer.Metainfo.Database,
peer.DB.Buckets(),
)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
}
{ // setup datarepair
log.Debug("Setting up datarepair")
// TODO: simplify argument list somehow
peer.Repair.Checker = checker.NewChecker(
peer.Log.Named("checker"),
peer.DB.RepairQueue(),
peer.DB.Irreparable(),
peer.Metainfo.Service,
peer.Metainfo.Loop,
peer.Overlay.Service,
config.Checker)
segmentRepairer := repairer.NewSegmentRepairer(
log.Named("repairer"),
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
peer.Dialer,
config.Repairer.Timeout,
config.Repairer.MaxExcessRateOptimalThreshold,
config.Checker.RepairOverride,
config.Repairer.DownloadTimeout,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
)
peer.Repair.Repairer = repairer.NewService(
peer.Log.Named("repairer"),
peer.DB.RepairQueue(),
&config.Repairer,
segmentRepairer,
)
}
{ // setup audit
log.Debug("Setting up audits")
config := config.Audit
peer.Audit.Queue = &audit.Queue{}
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Service,
peer.Dialer,
peer.Overlay.Service,
peer.DB.Containment(),
peer.Orders.Service,
peer.Identity,
config.MinBytesPerSecond,
config.MinDownloadTimeout,
)
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
peer.Overlay.Service,
peer.DB.Containment(),
config.MaxRetriesStatDB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit worker"),
peer.Audit.Queue,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit chore"),
peer.Audit.Queue,
peer.Metainfo.Loop,
config,
)
}
{ // setup garbage collection
log.Debug("Setting up garbage collection")
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
}
{ // setup db cleanup
log.Debug("Setting up db cleanup")
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
}
{ // setup accounting
log.Debug("Setting up accounting")
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
}
// TODO: remove in future, should be in API
{ // setup payments
config := paymentsconfig.Config{}
switch config.Provider {
default:
peer.Payments.Accounts = mockpayments.Accounts()
case "stripecoinpayments":
service := stripecoinpayments.NewService(
peer.Log.Named("stripecoinpayments service"),
config.StripeCoinPayments,
peer.DB.Customers(),
peer.DB.CoinpaymentsTransactions())
peer.Payments.Accounts = service.Accounts()
peer.Payments.Clearing = stripecoinpayments.NewChore(
peer.Log.Named("stripecoinpayments clearing loop"),
service,
config.StripeCoinPayments.TransactionUpdateInterval,
config.StripeCoinPayments.AccountBalanceUpdateInterval)
}
}
{ // setup graceful exit
if config.GracefulExit.Enabled {
log.Debug("Setting up graceful exit")
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
}
}
{ // setup metrics service
peer.Metrics.Chore = metrics.NewChore(
peer.Log.Named("metrics"),
config.Metrics,
peer.Metainfo.Loop,
)
}
return peer, nil
}
// Run runs satellite until it's either closed or it errors.
func (peer *Peer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metainfo.Loop.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Worker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
})
if peer.GracefulExit.Chore != nil {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx))
})
return group.Wait()
}
// Close closes all the resources.
func (peer *Peer) Close() error {
var errlist errs.Group
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
// close servers, to avoid new connections to closing subsystems
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}
if peer.GracefulExit.Chore != nil {
errlist.Add(peer.GracefulExit.Chore.Close())
}
// close services in reverse initialization order
if peer.Audit.Chore != nil {
errlist.Add(peer.Audit.Chore.Close())
}
if peer.Audit.Worker != nil {
errlist.Add(peer.Audit.Worker.Close())
}
if peer.Accounting.Rollup != nil {
errlist.Add(peer.Accounting.Rollup.Close())
}
if peer.Accounting.Tally != nil {
errlist.Add(peer.Accounting.Tally.Close())
}
if peer.DBCleanup.Chore != nil {
errlist.Add(peer.DBCleanup.Chore.Close())
}
if peer.Repair.Repairer != nil {
errlist.Add(peer.Repair.Repairer.Close())
}
if peer.Repair.Checker != nil {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
if peer.Metainfo.Loop != nil {
errlist.Add(peer.Metainfo.Loop.Close())
}
return errlist.Err()
}
// ID returns the peer ID.
func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }

View File

@ -50,7 +50,7 @@ PATH=$RELEASE_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network tes
# this replaces anywhere that has "/release/" in the config file, which currently just renames the static dir paths
sed -i -e 's#/release/#/branch/#g' `storj-sim network env SATELLITE_0_DIR`/config.yaml
# replace any 140XX port with 100XX port to fix, satellite.API part removal from satellite.Peer
# replace any 140XX port with 100XX port to fix, satellite.API part removal from satellite.Core
sed -i -e "s#$STORJ_NETWORK_HOST4:100#$STORJ_NETWORK_HOST4:140#g" `storj-sim network env SATELLITE_0_DIR`/config.yaml
REDIS_CONFIG=$(storj-sim network env REDIS_0_DIR)/redis.conf
if [ ! -f "$REDIS_CONFIG" ] ; then

View File

@ -120,7 +120,7 @@ type Peer struct {
Version *checker.Service
// services and endpoints
// TODO: similar grouping to satellite.Peer
// TODO: similar grouping to satellite.Core
Contact struct {
Service *contact.Service