From 6e6d0ad9b8924d0c63478ef2a119b43baebc051a Mon Sep 17 00:00:00 2001 From: Jess G Date: Wed, 2 Oct 2019 16:02:47 -0700 Subject: [PATCH] split satellite: create separate API process (#3152) --- satellite/api.go | 527 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 527 insertions(+) create mode 100644 satellite/api.go diff --git a/satellite/api.go b/satellite/api.go new file mode 100644 index 000000000..a63a2f62b --- /dev/null +++ b/satellite/api.go @@ -0,0 +1,527 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellite + +import ( + "context" + "net" + "net/mail" + "net/smtp" + + "github.com/zeebo/errs" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "storj.io/storj/internal/errs2" + "storj.io/storj/internal/post" + "storj.io/storj/internal/post/oauth2" + "storj.io/storj/internal/version" + "storj.io/storj/pkg/auth/grpcauth" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/peertls/extensions" + "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/pkg/rpc" + "storj.io/storj/pkg/server" + "storj.io/storj/pkg/signing" + "storj.io/storj/pkg/storj" + "storj.io/storj/satellite/accounting" + "storj.io/storj/satellite/accounting/live" + "storj.io/storj/satellite/accounting/rollup" + "storj.io/storj/satellite/console" + "storj.io/storj/satellite/console/consoleauth" + "storj.io/storj/satellite/console/consoleweb" + "storj.io/storj/satellite/contact" + "storj.io/storj/satellite/inspector" + "storj.io/storj/satellite/mailservice" + "storj.io/storj/satellite/mailservice/simulate" + "storj.io/storj/satellite/marketingweb" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/nodestats" + "storj.io/storj/satellite/orders" + "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/repair/irreparable" + "storj.io/storj/satellite/repair/repairer" + "storj.io/storj/satellite/vouchers" +) + +// APIConfig is the global config for the satellite API process +type APIConfig struct { + Identity identity.Config + Server server.Config + Version version.Config + + Contact contact.Config + Overlay overlay.Config + + Metainfo metainfo.Config + Orders orders.Config + + Repairer repairer.Config + + Rollup rollup.Config + LiveAccounting live.Config + + Mail mailservice.Config + Console consoleweb.Config + Marketing marketingweb.Config +} + +// API is the satellite API process +// +// architecture: Peer +type API struct { + Log *zap.Logger + Identity *identity.FullIdentity + DB DB + + Dialer rpc.Dialer + Server *server.Server + Version *version.Service + + Contact struct { + Service *contact.Service + Endpoint *contact.Endpoint + KEndpoint *contact.KademliaEndpoint + } + + Overlay struct { + DB overlay.DB + Service *overlay.Service + Inspector *overlay.Inspector + } + + Vouchers struct { + Endpoint *vouchers.Endpoint + } + + Orders struct { + Endpoint *orders.Endpoint + Service *orders.Service + } + + Metainfo struct { + Database metainfo.PointerDB + Service *metainfo.Service + Endpoint2 *metainfo.Endpoint + } + + Inspector struct { + Endpoint *inspector.Endpoint + } + + Repair struct { + Inspector *irreparable.Inspector + } + + Accounting struct { + ProjectUsage *accounting.ProjectUsage + } + + LiveAccounting struct { + Service live.Service + } + + Mail struct { + Service *mailservice.Service + } + + Console struct { + Listener net.Listener + Service *console.Service + Endpoint *consoleweb.Server + } + + Marketing struct { + Listener net.Listener + Endpoint *marketingweb.Server + } + + NodeStats struct { + Endpoint *nodestats.Endpoint + } +} + +// NewAPI creates a new satellite API process +func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB extensions.RevocationDB, config *APIConfig, versionInfo version.Info) (*API, error) { + peer := &API{ + Log: log, + Identity: full, + DB: db, + } + + var err error + + { + test := version.Info{} + if test != versionInfo { + peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", + versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) + } + peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") + } + + { // setup listener and server + log.Debug("Satellite API Process starting listener and server") + sc := config.Server + + tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + peer.Dialer = rpc.NewDefaultDialer(tlsOptions) + + unaryInterceptor := grpcauth.NewAPIKeyInterceptor() + if sc.DebugLogTraffic { + unaryInterceptor = server.CombineInterceptors(unaryInterceptor, server.UnaryMessageLoggingInterceptor(log)) + } + peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, unaryInterceptor) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + } + + { // setup overlay + log.Debug("Satellite API Process starting overlay") + peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache()) + peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) + peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service) + pb.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector) + pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector) + } + + { // setup contact service + log.Debug("Satellite API Process setting up contact service") + c := config.Contact + if c.ExternalAddress == "" { + c.ExternalAddress = peer.Addr() + } + + pbVersion, err := versionInfo.Proto() + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + self := &overlay.NodeDossier{ + Node: pb.Node{ + Id: peer.ID(), + Address: &pb.NodeAddress{ + Address: c.ExternalAddress, + }, + }, + Type: pb.NodeType_SATELLITE, + Version: *pbVersion, + } + peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) + peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) + peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint")) + pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) + pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint) + pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint) + pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint) + } + + { // setup vouchers + log.Debug("Satellite API Process setting up vouchers") + pb.RegisterVouchersServer(peer.Server.GRPC(), peer.Vouchers.Endpoint) + pb.DRPCRegisterVouchers(peer.Server.DRPC(), peer.Vouchers.Endpoint) + } + + { // setup live accounting + log.Debug("Satellite API Process setting up live accounting") + config := config.LiveAccounting + liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config) + if err != nil { + return nil, err + } + peer.LiveAccounting.Service = liveAccountingService + } + + { // setup accounting project usage + log.Debug("Satellite API Process setting up accounting project usage") + peer.Accounting.ProjectUsage = accounting.NewProjectUsage( + peer.DB.ProjectAccounting(), + peer.LiveAccounting.Service, + config.Rollup.MaxAlphaUsage, + ) + } + + { // setup orders + log.Debug("Satellite API Process setting up orders endpoint") + satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) + peer.Orders.Endpoint = orders.NewEndpoint( + peer.Log.Named("orders:endpoint"), + satelliteSignee, + peer.DB.Orders(), + config.Orders.SettlementBatchSize, + ) + peer.Orders.Service = orders.NewService( + peer.Log.Named("orders:service"), + signing.SignerFromFullIdentity(peer.Identity), + peer.Overlay.Service, + peer.DB.Orders(), + config.Orders.Expiration, + &pb.NodeAddress{ + Transport: pb.NodeTransport_TCP_TLS_GRPC, + Address: config.Contact.ExternalAddress, + }, + config.Repairer.MaxExcessRateOptimalThreshold, + ) + pb.RegisterOrdersServer(peer.Server.GRPC(), peer.Orders.Endpoint) + pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint.DRPC()) + } + + { // setup metainfo + log.Debug("Satellite API Process setting up metainfo") + db, err := metainfo.NewStore(peer.Log.Named("metainfo:store"), config.Metainfo.DatabaseURL) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + peer.Metainfo.Database = db + peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"), + peer.Metainfo.Database, + peer.DB.Buckets(), + ) + peer.Metainfo.Endpoint2 = metainfo.NewEndpoint( + peer.Log.Named("metainfo:endpoint"), + peer.Metainfo.Service, + peer.Orders.Service, + peer.Overlay.Service, + peer.DB.Attribution(), + peer.DB.PeerIdentities(), + peer.DB.Console().APIKeys(), + peer.Accounting.ProjectUsage, + config.Metainfo.RS, + signing.SignerFromFullIdentity(peer.Identity), + config.Metainfo.MaxCommitInterval, + ) + pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2) + pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2) + } + + { // setup datarepair + log.Debug("Satellite API Process setting up datarepair inspector") + peer.Repair.Inspector = irreparable.NewInspector(peer.DB.Irreparable()) + pb.RegisterIrreparableInspectorServer(peer.Server.PrivateGRPC(), peer.Repair.Inspector) + pb.DRPCRegisterIrreparableInspector(peer.Server.PrivateDRPC(), peer.Repair.Inspector) + } + + { // setup inspector + log.Debug("Satellite API Process setting up inspector") + peer.Inspector.Endpoint = inspector.NewEndpoint( + peer.Log.Named("inspector"), + peer.Overlay.Service, + peer.Metainfo.Service, + ) + pb.RegisterHealthInspectorServer(peer.Server.PrivateGRPC(), peer.Inspector.Endpoint) + pb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint) + } + + { // setup mailservice + log.Debug("Satellite API Process setting up mail service") + // TODO(yar): test multiple satellites using same OAUTH credentials + mailConfig := config.Mail + + // validate from mail address + from, err := mail.ParseAddress(mailConfig.From) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + // validate smtp server address + host, _, err := net.SplitHostPort(mailConfig.SMTPServerAddress) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + var sender mailservice.Sender + switch mailConfig.AuthType { + case "oauth2": + creds := oauth2.Credentials{ + ClientID: mailConfig.ClientID, + ClientSecret: mailConfig.ClientSecret, + TokenURI: mailConfig.TokenURI, + } + token, err := oauth2.RefreshToken(context.TODO(), creds, mailConfig.RefreshToken) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + + sender = &post.SMTPSender{ + From: *from, + Auth: &oauth2.Auth{ + UserEmail: from.Address, + Storage: oauth2.NewTokenStore(creds, *token), + }, + ServerAddress: mailConfig.SMTPServerAddress, + } + case "plain": + sender = &post.SMTPSender{ + From: *from, + Auth: smtp.PlainAuth("", mailConfig.Login, mailConfig.Password, host), + ServerAddress: mailConfig.SMTPServerAddress, + } + case "login": + sender = &post.SMTPSender{ + From: *from, + Auth: post.LoginAuth{ + Username: mailConfig.Login, + Password: mailConfig.Password, + }, + ServerAddress: mailConfig.SMTPServerAddress, + } + default: + sender = &simulate.LinkClicker{} + } + + peer.Mail.Service, err = mailservice.New( + peer.Log.Named("mail:service"), + sender, + mailConfig.TemplatePath, + ) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + } + + { // setup console + log.Debug("Satellite API Process setting up console") + consoleConfig := config.Console + peer.Console.Listener, err = net.Listen("tcp", consoleConfig.Address) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + if consoleConfig.AuthTokenSecret == "" { + return nil, errs.New("Auth token secret required") + } + peer.Console.Service, err = console.NewService( + peer.Log.Named("console:service"), + &consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)}, + peer.DB.Console(), + peer.DB.Rewards(), + consoleConfig.PasswordCost, + ) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + peer.Console.Endpoint = consoleweb.NewServer( + peer.Log.Named("console:endpoint"), + consoleConfig, + peer.Console.Service, + peer.Mail.Service, + peer.Console.Listener, + ) + } + + { // setup marketing portal + log.Debug("Satellite API Process setting up marketing server") + marketingConfig := config.Marketing + peer.Marketing.Listener, err = net.Listen("tcp", marketingConfig.Address) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + peer.Marketing.Endpoint, err = marketingweb.NewServer( + peer.Log.Named("marketing:endpoint"), + marketingConfig, + peer.DB.Rewards(), + peer.Marketing.Listener, + ) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + } + + { // setup node stats endpoint + log.Debug("Satellite API Process setting up node stats endpoint") + peer.NodeStats.Endpoint = nodestats.NewEndpoint( + peer.Log.Named("nodestats:endpoint"), + peer.Overlay.DB, + peer.DB.StoragenodeAccounting(), + ) + pb.RegisterNodeStatsServer(peer.Server.GRPC(), peer.NodeStats.Endpoint) + pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint) + } + + return peer, nil +} + +// Run runs satellite until it's either closed or it errors. +func (peer *API) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + group, ctx := errgroup.WithContext(ctx) + + group.Go(func() error { + return errs2.IgnoreCanceled(peer.Version.Run(ctx)) + }) + group.Go(func() error { + // Don't change the format of this comment, it is used to figure out the node id. + peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) + peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) + peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) + return errs2.IgnoreCanceled(peer.Server.Run(ctx)) + }) + group.Go(func() error { + return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx)) + }) + group.Go(func() error { + return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx)) + }) + + return group.Wait() +} + +// Close closes all the resources. +func (peer *API) Close() error { + var errlist errs.Group + + // close servers, to avoid new connections to closing subsystems + if peer.Server != nil { + errlist.Add(peer.Server.Close()) + } + if peer.Marketing.Endpoint != nil { + errlist.Add(peer.Marketing.Endpoint.Close()) + } else if peer.Marketing.Listener != nil { + errlist.Add(peer.Marketing.Listener.Close()) + } + if peer.Console.Endpoint != nil { + errlist.Add(peer.Console.Endpoint.Close()) + } else if peer.Console.Listener != nil { + errlist.Add(peer.Console.Listener.Close()) + } + if peer.Mail.Service != nil { + errlist.Add(peer.Mail.Service.Close()) + } + if peer.Metainfo.Endpoint2 != nil { + errlist.Add(peer.Metainfo.Endpoint2.Close()) + } + if peer.Metainfo.Database != nil { + errlist.Add(peer.Metainfo.Database.Close()) + } + if peer.Contact.Service != nil { + errlist.Add(peer.Contact.Service.Close()) + } + if peer.Overlay.Service != nil { + errlist.Add(peer.Overlay.Service.Close()) + } + return errlist.Err() +} + +// ID returns the peer ID. +func (peer *API) ID() storj.NodeID { return peer.Identity.ID } + +// Local returns the peer local node info. +func (peer *API) Local() overlay.NodeDossier { return peer.Contact.Service.Local() } + +// Addr returns the public address. +func (peer *API) Addr() string { return peer.Server.Addr().String() } + +// URL returns the storj.NodeURL. +func (peer *API) URL() storj.NodeURL { + return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} +} + +// PrivateAddr returns the private address. +func (peer *API) PrivateAddr() string { return peer.Server.PrivateAddr().String() }