satellite: remove satellite API code from peer (#3414)

* rm dup api code from sa peer, update storj-sim

* fix for backwards compat tests

* use env var instead of localhost

* changes per CR

* fix env var name

* skip peer for setup
This commit is contained in:
Jess G 2019-10-30 12:23:09 -07:00 committed by GitHub
parent 4d85b11574
commit e96d615013
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 32 additions and 372 deletions

View File

@ -72,6 +72,11 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
zap.S().Warn("Failed to initialize telemetry batcher on satellite api: ", err)
}
err = db.CreateTables()
if err != nil {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)

View File

@ -200,11 +200,6 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
zap.S().Warn("Failed to initialize telemetry batcher: ", err)
}
err = db.CreateTables()
if err != nil {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)

View File

@ -47,7 +47,6 @@ const (
// Peer class
satellitePeer = 0
satelliteAPI = 4
gatewayPeer = 1
versioncontrolPeer = 2
storagenodePeer = 3
@ -58,7 +57,9 @@ const (
publicHTTP = 2
privateHTTP = 3
debugHTTP = 9
// satellite specific constants
debugPeerHTTP = 7
debugRepairerHTTP = 8
)
@ -263,7 +264,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugHTTP)),
},
"run": {},
"run": {"api"},
})
if flags.Postgres != "" {
@ -278,25 +279,18 @@ func newNetwork(flags *Flags) (*Processes, error) {
}
}
// Create the API process for each satellite
var satelliteAPIs []*Process
// Create the peer process for each satellite API
for i, satellite := range satellites {
process := processes.New(Info{
Name: fmt.Sprintf("satellite-api/%d", i),
Name: fmt.Sprintf("satellite-peer/%d", i),
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
Address: net.JoinHostPort(host, port(satelliteAPI, i, publicGRPC)),
Address: "",
})
satelliteAPIs = append(satelliteAPIs, process)
process.Arguments = withCommon(process.Directory, Arguments{
"run": {
"api",
"--console.address", net.JoinHostPort(host, port(satelliteAPI, i, publicHTTP)),
"--marketing.address", net.JoinHostPort(host, port(satelliteAPI, i, privateHTTP)),
"--server.address", process.Address,
"--server.private-address", net.JoinHostPort(host, port(satelliteAPI, i, privateGRPC)),
"--debug.addr", net.JoinHostPort(host, port(satelliteAPI, i, debugHTTP)),
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
},
})
@ -322,7 +316,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
}
// Create gateways for each satellite
for i, satellite := range satelliteAPIs {
for i, satellite := range satellites {
satellite := satellite
process := processes.New(Info{
Name: fmt.Sprintf("gateway/%d", i),
@ -452,7 +446,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
Address: net.JoinHostPort(host, port(storagenodePeer, i, publicGRPC)),
})
for _, satellite := range satelliteAPIs {
for _, satellite := range satellites {
process.WaitForStart(satellite)
}
@ -481,15 +475,15 @@ func newNetwork(flags *Flags) (*Processes, error) {
process.ExecBefore["setup"] = func(process *Process) error {
whitelisted := []string{}
for _, satelliteAPI := range satelliteAPIs {
for _, satellite := range satellites {
peer, err := identity.PeerConfig{
CertPath: filepath.Join(satelliteAPI.Directory, "identity.cert"),
CertPath: filepath.Join(satellite.Directory, "identity.cert"),
}.Load()
if err != nil {
return err
}
whitelisted = append(whitelisted, peer.ID.String()+"@"+satelliteAPI.Address)
whitelisted = append(whitelisted, peer.ID.String()+"@"+satellite.Address)
}
process.Arguments["setup"] = append(process.Arguments["setup"],
@ -540,8 +534,8 @@ func identitySetup(network *Processes) (*Processes, error) {
continue
}
if strings.Contains(process.Name, "satellite-api") {
// satellite-api uses the same identity as the satellite
if strings.Contains(process.Name, "satellite-peer") {
// satellite-peer uses the same identity as the satellite
continue
}
if strings.Contains(process.Name, "satellite-repair") {

View File

@ -5,9 +5,6 @@ package satellite
import (
"context"
"net"
"net/mail"
"net/smtp"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -15,11 +12,8 @@ import (
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/post"
"storj.io/storj/internal/post/oauth2"
"storj.io/storj/internal/version"
version_checker "storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/extensions"
@ -35,19 +29,15 @@ import (
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleauth"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/mailservice/simulate"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments"
@ -58,7 +48,6 @@ import (
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/rewards"
"storj.io/storj/satellite/vouchers"
)
var mon = monkit.Package()
@ -155,42 +144,27 @@ type Peer struct {
Dialer rpc.Dialer
Server *server.Server
Version *version_checker.Service
// services and endpoints
Contact struct {
Service *contact.Service
Endpoint *contact.Endpoint
}
Overlay struct {
DB overlay.DB
Service *overlay.Service
Inspector *overlay.Inspector
DB overlay.DB
Service *overlay.Service
}
Metainfo struct {
Database metainfo.PointerDB // TODO: move into pointerDB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Loop *metainfo.Loop
}
Inspector struct {
Endpoint *inspector.Endpoint
Database metainfo.PointerDB // TODO: move into pointerDB
Service *metainfo.Service
Loop *metainfo.Loop
}
Orders struct {
Endpoint *orders.Endpoint
Service *orders.Service
Service *orders.Service
}
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
Inspector *irreparable.Inspector
Checker *checker.Checker
Repairer *repairer.Service
}
Audit struct {
Queue *audit.Queue
@ -218,37 +192,13 @@ type Peer struct {
Cache accounting.Cache
}
Mail struct {
Service *mailservice.Service
}
Vouchers struct {
Endpoint *vouchers.Endpoint
}
Payments struct {
Accounts payments.Accounts
Clearing payments.Clearing
}
Console struct {
Listener net.Listener
Service *console.Service
Endpoint *consoleweb.Server
}
Marketing struct {
Listener net.Listener
Endpoint *marketingweb.Server
}
NodeStats struct {
Endpoint *nodestats.Endpoint
}
GracefulExit struct {
Endpoint *gracefulexit.Endpoint
Chore *gracefulexit.Chore
Chore *gracefulexit.Chore
}
Metrics struct {
@ -284,15 +234,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
unaryInterceptor := grpcauth.NewAPIKeyInterceptor()
if sc.DebugLogTraffic {
unaryInterceptor = server.CombineInterceptors(unaryInterceptor, server.UnaryMessageLoggingInterceptor(log))
}
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, unaryInterceptor)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup overlay
@ -300,44 +241,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service)
pb.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector)
pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector)
}
{ // setup contact service
log.Debug("Setting up contact service")
c := config.Contact
if c.ExternalAddress == "" {
c.ExternalAddress = peer.Addr()
}
pbVersion, err := versionInfo.Proto()
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
self := &overlay.NodeDossier{
Node: pb.Node{
Id: peer.ID(),
Address: &pb.NodeAddress{
Address: c.ExternalAddress,
},
},
Type: pb.NodeType_SATELLITE,
Version: *pbVersion,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint)
}
{ // setup vouchers
log.Debug("Setting up vouchers")
pb.RegisterVouchersServer(peer.Server.GRPC(), peer.Vouchers.Endpoint)
pb.DRPCRegisterVouchers(peer.Server.DRPC(), peer.Vouchers.Endpoint)
}
{ // setup live accounting
@ -356,13 +259,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
{ // setup orders
log.Debug("Setting up orders")
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),
satelliteSignee,
peer.DB.Orders(),
config.Orders.SettlementBatchSize,
)
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
@ -375,8 +271,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
},
config.Repairer.MaxExcessRateOptimalThreshold,
)
pb.RegisterOrdersServer(peer.Server.GRPC(), peer.Orders.Endpoint)
pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint.DRPC())
}
{ // setup metainfo
@ -388,23 +282,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
peer.DB.Buckets(),
)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
peer.Metainfo.Endpoint2 = metainfo.NewEndpoint(
peer.Log.Named("metainfo:endpoint"),
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
peer.DB.Attribution(),
peer.DB.PeerIdentities(),
peer.DB.Console().APIKeys(),
peer.Accounting.ProjectUsage,
config.Metainfo.RS,
signing.SignerFromFullIdentity(peer.Identity),
config.Metainfo.MaxCommitInterval,
)
pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2)
pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2)
}
{ // setup datarepair
@ -437,10 +314,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
&config.Repairer,
segmentRepairer,
)
peer.Repair.Inspector = irreparable.NewInspector(peer.DB.Irreparable())
pb.RegisterIrreparableInspectorServer(peer.Server.PrivateGRPC(), peer.Repair.Inspector)
pb.DRPCRegisterIrreparableInspector(peer.Server.PrivateDRPC(), peer.Repair.Inspector)
}
{ // setup audit
@ -508,86 +381,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
}
{ // setup inspector
log.Debug("Setting up inspector")
peer.Inspector.Endpoint = inspector.NewEndpoint(
peer.Log.Named("inspector"),
peer.Overlay.Service,
peer.Metainfo.Service,
)
pb.RegisterHealthInspectorServer(peer.Server.PrivateGRPC(), peer.Inspector.Endpoint)
pb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint)
}
{ // setup mailservice
log.Debug("Setting up mail service")
// TODO(yar): test multiple satellites using same OAUTH credentials
mailConfig := config.Mail
// validate from mail address
from, err := mail.ParseAddress(mailConfig.From)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
// validate smtp server address
host, _, err := net.SplitHostPort(mailConfig.SMTPServerAddress)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
var sender mailservice.Sender
switch mailConfig.AuthType {
case "oauth2":
creds := oauth2.Credentials{
ClientID: mailConfig.ClientID,
ClientSecret: mailConfig.ClientSecret,
TokenURI: mailConfig.TokenURI,
}
token, err := oauth2.RefreshToken(context.TODO(), creds, mailConfig.RefreshToken)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
sender = &post.SMTPSender{
From: *from,
Auth: &oauth2.Auth{
UserEmail: from.Address,
Storage: oauth2.NewTokenStore(creds, *token),
},
ServerAddress: mailConfig.SMTPServerAddress,
}
case "plain":
sender = &post.SMTPSender{
From: *from,
Auth: smtp.PlainAuth("", mailConfig.Login, mailConfig.Password, host),
ServerAddress: mailConfig.SMTPServerAddress,
}
case "login":
sender = &post.SMTPSender{
From: *from,
Auth: post.LoginAuth{
Username: mailConfig.Login,
Password: mailConfig.Password,
},
ServerAddress: mailConfig.SMTPServerAddress,
}
default:
sender = &simulate.LinkClicker{}
}
peer.Mail.Service, err = mailservice.New(
peer.Log.Named("mail:service"),
sender,
mailConfig.TemplatePath,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup payments
config := paymentsconfig.Config{}
@ -605,90 +398,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
config.StripeCoinPayments.AccountBalanceUpdateInterval)
}
{ // setup console
log.Debug("Setting up console")
consoleConfig := config.Console
peer.Console.Listener, err = net.Listen("tcp", consoleConfig.Address)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
if consoleConfig.AuthTokenSecret == "" {
return nil, errs.New("Auth token secret required")
}
peer.Console.Service, err = console.NewService(
peer.Log.Named("console:service"),
&consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)},
peer.DB.Console(),
peer.DB.Rewards(),
peer.Payments.Accounts,
consoleConfig.PasswordCost,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Console.Endpoint = consoleweb.NewServer(
peer.Log.Named("console:endpoint"),
consoleConfig,
peer.Console.Service,
peer.Mail.Service,
peer.Console.Listener,
)
}
{ // setup marketing portal
log.Debug("Setting up marketing server")
marketingConfig := config.Marketing
peer.Marketing.Listener, err = net.Listen("tcp", marketingConfig.Address)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Marketing.Endpoint, err = marketingweb.NewServer(
peer.Log.Named("marketing:endpoint"),
marketingConfig,
peer.DB.Rewards(),
peer.Marketing.Listener,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
{ // setup node stats endpoint
log.Debug("Setting up node stats endpoint")
peer.NodeStats.Endpoint = nodestats.NewEndpoint(
peer.Log.Named("nodestats:endpoint"),
peer.Overlay.DB,
peer.DB.StoragenodeAccounting())
pb.RegisterNodeStatsServer(peer.Server.GRPC(), peer.NodeStats.Endpoint)
pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint)
}
{ // setup graceful exit
log.Debug("Setting up graceful")
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
peer.Log.Named("gracefulexit:endpoint"),
signing.SignerFromFullIdentity(peer.Identity),
peer.DB.GracefulExit(),
peer.Overlay.DB,
peer.Overlay.Service,
peer.Metainfo.Service,
peer.Orders.Service,
peer.DB.PeerIdentities(),
config.GracefulExit)
pb.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint)
pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC())
}
{ // setup metrics service
@ -738,20 +450,6 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
})
group.Go(func() error {
// TODO: move the message into Server instead
// Don't change the format of this comment, it is used to figure out the node id.
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return errs2.IgnoreCanceled(peer.Server.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
@ -769,10 +467,6 @@ func (peer *Peer) Close() error {
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
// close servers, to avoid new connections to closing subsystems
if peer.Server != nil {
errlist.Add(peer.Server.Close())
}
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}
@ -781,22 +475,6 @@ func (peer *Peer) Close() error {
errlist.Add(peer.GracefulExit.Chore.Close())
}
if peer.Console.Endpoint != nil {
errlist.Add(peer.Console.Endpoint.Close())
} else if peer.Console.Listener != nil {
errlist.Add(peer.Console.Listener.Close())
}
if peer.Mail.Service != nil {
errlist.Add(peer.Mail.Service.Close())
}
if peer.Marketing.Endpoint != nil {
errlist.Add(peer.Marketing.Endpoint.Close())
} else if peer.Marketing.Listener != nil {
errlist.Add(peer.Marketing.Listener.Close())
}
// close services in reverse initialization order
if peer.Audit.Chore != nil {
@ -823,9 +501,6 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Contact.Service != nil {
errlist.Add(peer.Contact.Service.Close())
}
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
@ -838,15 +513,3 @@ func (peer *Peer) Close() error {
// ID returns the peer ID.
func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }
// Local returns the peer local node info.
func (peer *Peer) Local() overlay.NodeDossier { return peer.Contact.Service.Local() }
// Addr returns the public address.
func (peer *Peer) Addr() string { return peer.Server.Addr().String() }
// URL returns the storj.NodeURL.
func (peer *Peer) URL() storj.NodeURL { return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} }
// PrivateAddr returns the private address.
func (peer *Peer) PrivateAddr() string { return peer.Server.PrivateAddr().String() }

View File

@ -50,6 +50,9 @@ PATH=$RELEASE_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network tes
# this replaces anywhere that has "/release/" in the config file, which currently just renames the static dir paths
sed -i -e 's#/release/#/branch/#g' `storj-sim network env SATELLITE_0_DIR`/config.yaml
# replace any 140XX port with 100XX port to fix, satellite.API part removal from satellite.Peer
sed -i -e "s#$STORJ_NETWORK_HOST4:100#$STORJ_NETWORK_HOST4:140#g" `storj-sim network env SATELLITE_0_DIR`/config.yaml
## Ensure that partially upgraded network works
# keep half of the storage nodes on the old version
@ -71,4 +74,4 @@ cp $RELEASE_DIR/bin/uplink $BRANCH_DIR/bin/uplink
PATH=$BRANCH_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-backwards.sh download
# run a delete in the network
PATH=$BRANCH_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-backwards.sh cleanup
PATH=$BRANCH_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-backwards.sh cleanup