{cmd/satellite, storj/satellite}: create command to run repair process in isolation (#3341)

* set up satellite repair run command

* add separated repair process to storj-sim

* add repairer peer to satellite in testplanet

* move api run cmd into api.go

* add satellite run repair to entrypoint
This commit is contained in:
Cameron 2019-10-29 10:55:57 -04:00 committed by GitHub
parent 14e661a1b0
commit b2ff13f1fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 377 additions and 68 deletions

78
cmd/satellite/api.go Normal file
View File

@ -0,0 +1,78 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb"
)
func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
identity, err := runCfg.Identity.Load()
if err != nil {
zap.S().Fatal(err)
}
db, err := satellitedb.New(log.Named("db"), runCfg.Database)
if err != nil {
return errs.New("Error starting master database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metainfo database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
revocationDB, err := revocation.NewDBFromCfg(runCfg.Config.Server.Config)
if err != nil {
return errs.New("Error creating revocation database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
accountingCache, err := live.NewCache(log.Named("live-accounting"), runCfg.LiveAccounting)
if err != nil {
return errs.New("Error creating live accounting cache on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, accountingCache.Close())
}()
peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, &runCfg.Config, version.Build)
if err != nil {
return err
}
err = peer.Version.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Warn("Failed to initialize telemetry batcher on satellite api: ", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

View File

@ -29,4 +29,8 @@ if [ "${SATELLITE_API:-}" = "true" ]; then
exec ./satellite run api $RUN_PARAMS "$@"
fi
if [ "${SATELLITE_REPAIR:-}" = "true" ]; then
exec ./satellite run repair $RUN_PARAMS "$@"
fi
exec ./satellite run $RUN_PARAMS "$@"

View File

@ -50,6 +50,11 @@ var (
Short: "Run the satellite API",
RunE: cmdAPIRun,
}
runRepairerCmd = &cobra.Command{
Use: "repair",
Short: "Run the repair service",
RunE: cmdRepairerRun,
}
setupCmd = &cobra.Command{
Use: "setup",
Short: "Create config files",
@ -119,6 +124,7 @@ func init() {
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.AddCommand(runCmd)
runCmd.AddCommand(runAPICmd)
runCmd.AddCommand(runRepairerCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(qdiagCmd)
rootCmd.AddCommand(reportsCmd)
@ -127,6 +133,7 @@ func init() {
reportsCmd.AddCommand(gracefulExitCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -203,66 +210,6 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return errs.Combine(runError, closeError)
}
func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
identity, err := runCfg.Identity.Load()
if err != nil {
zap.S().Fatal(err)
}
db, err := satellitedb.New(log.Named("db"), runCfg.Database)
if err != nil {
return errs.New("Error starting master database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating revocation database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
revocationDB, err := revocation.NewDBFromCfg(runCfg.Config.Server.Config)
if err != nil {
return errs.New("Error creating revocation database on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
accountingCache, err := live.NewCache(log.Named("live-accounting"), runCfg.LiveAccounting)
if err != nil {
return errs.New("Error creating live accounting cache on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, accountingCache.Close())
}()
peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, &runCfg.Config, version.Build)
if err != nil {
return err
}
err = peer.Version.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Warn("Failed to initialize telemetry batcher on satellite api: ", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}
func cmdSetup(cmd *cobra.Command, args []string) (err error) {
setupDir, err := filepath.Abs(confDir)
if err != nil {

80
cmd/satellite/repairer.go Normal file
View File

@ -0,0 +1,80 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb"
)
func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
identity, err := runCfg.Identity.Load()
if err != nil {
zap.S().Fatal(err)
}
db, err := satellitedb.New(log.Named("db"), runCfg.Database)
if err != nil {
return errs.New("Error starting master database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metainfo database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
revocationDB, err := revocation.NewDBFromCfg(runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
peer, err := satellite.NewRepairer(
log,
identity,
pointerDB,
revocationDB,
db.RepairQueue(),
db.Buckets(),
db.OverlayCache(),
db.Orders(),
version.Build,
&runCfg.Config,
)
if err != nil {
return err
}
err = peer.Version.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Warn("Failed to initialize telemetry batcher on repairer: ", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

View File

@ -51,6 +51,7 @@ const (
gatewayPeer = 1
versioncontrolPeer = 2
storagenodePeer = 3
repairPeer = 5
// Endpoint
publicGRPC = 0
@ -301,6 +302,24 @@ func newNetwork(flags *Flags) (*Processes, error) {
process.WaitForStart(satellite)
}
// Create the repairer process for each satellite
for i, satellite := range satellites {
process := processes.New(Info{
Name: fmt.Sprintf("satellite-repairer/%d", i),
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
})
process.Arguments = withCommon(process.Directory, Arguments{
"run": {
"repair",
"--debug.addr", net.JoinHostPort(host, port(repairPeer, i, debugHTTP)),
},
})
process.WaitForStart(satellite)
}
// Create gateways for each satellite
for i, satellite := range satelliteAPIs {
satellite := satellite
@ -524,6 +543,10 @@ func identitySetup(network *Processes) (*Processes, error) {
// satellite-api uses the same identity as the satellite
continue
}
if strings.Contains(process.Name, "satellite-repair") {
// satellite-repair uses the same identity as the satellite
continue
}
identity := processes.New(Info{
Name: "identity/" + process.Info.Name,

View File

@ -55,8 +55,9 @@ import (
// SatelliteSystem contains all the processes needed to run a full Satellite setup
type SatelliteSystem struct {
Peer *satellite.Peer
API *satellite.API
Peer *satellite.Peer
API *satellite.API
Repairer *satellite.Repairer
Log *zap.Logger
Identity *identity.FullIdentity
@ -175,7 +176,7 @@ func (system *SatelliteSystem) URL() storj.NodeURL {
// Close closes all the subsystems in the Satellite system
func (system *SatelliteSystem) Close() error {
return errs.Combine(system.API.Close(), system.Peer.Close())
return errs.Combine(system.API.Close(), system.Peer.Close(), system.Repairer.Close())
}
// Run runs all the subsystems in the Satellite system
@ -188,6 +189,9 @@ func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.API.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Repairer.Run(ctx))
})
return group.Wait()
}
@ -408,9 +412,15 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
if err != nil {
return xs, err
}
repairerPeer, err := planet.newRepairer(i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
log.Debug("id=" + peer.ID().String() + " addr=" + api.Addr())
system := createNewSystem(log, peer, api)
system := createNewSystem(log, peer, api, repairerPeer)
xs = append(xs, system)
}
return xs, nil
@ -420,10 +430,11 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many prrocesses.
func createNewSystem(log *zap.Logger, peer *satellite.Peer, api *satellite.API) *SatelliteSystem {
func createNewSystem(log *zap.Logger, peer *satellite.Peer, api *satellite.API, repairerPeer *satellite.Repairer) *SatelliteSystem {
system := &SatelliteSystem{
Peer: peer,
API: api,
Peer: peer,
API: api,
Repairer: repairerPeer,
}
system.Log = log
system.Identity = peer.Identity
@ -449,7 +460,7 @@ func createNewSystem(log *zap.Logger, peer *satellite.Peer, api *satellite.API)
system.Orders.Service = peer.Orders.Service
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = peer.Repair.Repairer
system.Repair.Repairer = repairerPeer.Repairer
system.Repair.Inspector = api.Repair.Inspector
system.Audit.Queue = peer.Audit.Queue
@ -496,3 +507,16 @@ func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db sate
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo)
}
func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config,
versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(count)
log := planet.log.Named(prefix)
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), versionInfo, &config)
}

153
satellite/repairer.go Normal file
View File

@ -0,0 +1,153 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version"
version_checker "storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/extensions"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/repair/repairer"
)
// Repairer is the repairer process.
//
// architecture: Peer
type Repairer struct {
Log *zap.Logger
Identity *identity.FullIdentity
Dialer rpc.Dialer
Version *version_checker.Service
Metainfo *metainfo.Service
Overlay *overlay.Service
Orders *orders.Service
SegmentRepairer *repairer.SegmentRepairer
Repairer *repairer.Service
}
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB, versionInfo version.Info, config *Config) (*Repairer, error) {
peer := &Repairer{
Log: log,
Identity: full,
}
{
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
}
{ // setup dialer
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup metainfo
log.Debug("Setting up metainfo")
peer.Metainfo = metainfo.NewService(log.Named("metainfo"), pointerDB, bucketsDB)
}
{ // setup overlay
log.Debug("Setting up overlay")
peer.Overlay = overlay.NewService(log.Named("overlay"), overlayCache, config.Overlay)
}
{ // setup orders
log.Debug("Setting up orders")
peer.Orders = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
ordersDB,
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.Contact.ExternalAddress,
},
config.Repairer.MaxExcessRateOptimalThreshold,
)
}
{ // setup repairer
log.Debug("Setting up repairer")
peer.SegmentRepairer = repairer.NewSegmentRepairer(
log.Named("segment repairer"),
peer.Metainfo,
peer.Orders,
peer.Overlay,
peer.Dialer,
config.Repairer.Timeout,
config.Repairer.MaxExcessRateOptimalThreshold,
config.Checker.RepairOverride,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
)
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer)
}
return peer, nil
}
// Run runs the repair process until it's either closed or it errors.
func (peer *Repairer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
peer.Log.Info("Repairer started")
return errs2.IgnoreCanceled(peer.Repairer.Run(ctx))
})
return group.Wait()
}
// Close closes all the resources.
func (peer *Repairer) Close() error {
var errlist errs.Group
// close services in reverse initialization order
if peer.Overlay != nil {
errlist.Add(peer.Overlay.Close())
}
if peer.Repairer != nil {
errlist.Add(peer.Repairer.Close())
}
return errlist.Err()
}
// ID returns the peer ID.
func (peer *Repairer) ID() storj.NodeID { return peer.Identity.ID }