// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package satellite import ( "context" "net" "net/mail" "net/smtp" "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" "storj.io/storj/internal/errs2" "storj.io/storj/internal/post" "storj.io/storj/internal/post/oauth2" "storj.io/storj/internal/version" "storj.io/storj/internal/version/checker" "storj.io/storj/pkg/auth/grpcauth" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls/extensions" "storj.io/storj/pkg/peertls/tlsopts" "storj.io/storj/pkg/rpc" "storj.io/storj/pkg/server" "storj.io/storj/pkg/signing" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" "storj.io/storj/satellite/console/consoleweb" "storj.io/storj/satellite/contact" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/inspector" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/mailservice/simulate" "storj.io/storj/satellite/marketingweb" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/nodestats" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/payments" "storj.io/storj/satellite/payments/mockpayments" "storj.io/storj/satellite/payments/paymentsconfig" "storj.io/storj/satellite/payments/stripecoinpayments" "storj.io/storj/satellite/repair/irreparable" "storj.io/storj/satellite/rewards" "storj.io/storj/satellite/vouchers" ) // API is the satellite API process // // architecture: Peer type API struct { Log *zap.Logger Identity *identity.FullIdentity DB DB Dialer rpc.Dialer Server *server.Server Version *checker.Service Contact struct { Service *contact.Service Endpoint *contact.Endpoint } Overlay struct { DB overlay.DB Service *overlay.Service Inspector *overlay.Inspector } Vouchers struct { Endpoint *vouchers.Endpoint } Orders struct { Endpoint *orders.Endpoint Service *orders.Service } Metainfo struct { Database metainfo.PointerDB Service *metainfo.Service Endpoint2 *metainfo.Endpoint } Inspector struct { Endpoint *inspector.Endpoint } Repair struct { Inspector *irreparable.Inspector } Accounting struct { ProjectUsage *accounting.ProjectUsage } LiveAccounting struct { Cache accounting.Cache } Mail struct { Service *mailservice.Service } Payments struct { Accounts payments.Accounts Inspector *stripecoinpayments.Endpoint } Console struct { Listener net.Listener Service *console.Service Endpoint *consoleweb.Server } Marketing struct { PartnersService *rewards.PartnersService Listener net.Listener Endpoint *marketingweb.Server } NodeStats struct { Endpoint *nodestats.Endpoint } GracefulExit struct { Endpoint *gracefulexit.Endpoint } } // NewAPI creates a new satellite API process func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, config *Config, versionInfo version.Info) (*API, error) { peer := &API{ Log: log, Identity: full, DB: db, } var err error { if !versionInfo.IsZero() { peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") } { // setup listener and server log.Debug("Satellite API Process starting listener and server") sc := config.Server tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Dialer = rpc.NewDefaultDialer(tlsOptions) unaryInterceptor := grpcauth.NewAPIKeyInterceptor() if sc.DebugLogTraffic { unaryInterceptor = server.CombineInterceptors(unaryInterceptor, server.UnaryMessageLoggingInterceptor(log)) } peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, unaryInterceptor) if err != nil { return nil, errs.Combine(err, peer.Close()) } } { // setup overlay log.Debug("Satellite API Process starting overlay") peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache()) peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service) pb.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector) pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector) } { // setup contact service log.Debug("Satellite API Process setting up contact service") c := config.Contact if c.ExternalAddress == "" { c.ExternalAddress = peer.Addr() } pbVersion, err := versionInfo.Proto() if err != nil { return nil, errs.Combine(err, peer.Close()) } self := &overlay.NodeDossier{ Node: pb.Node{ Id: peer.ID(), Address: &pb.NodeAddress{ Address: c.ExternalAddress, }, }, Type: pb.NodeType_SATELLITE, Version: *pbVersion, } peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint) } { // setup vouchers log.Debug("Satellite API Process setting up vouchers") pb.RegisterVouchersServer(peer.Server.GRPC(), peer.Vouchers.Endpoint) pb.DRPCRegisterVouchers(peer.Server.DRPC(), peer.Vouchers.Endpoint) } { // setup live accounting log.Debug("Satellite API Process setting up live accounting") peer.LiveAccounting.Cache = liveAccounting } { // setup accounting project usage log.Debug("Satellite API Process setting up accounting project usage") peer.Accounting.ProjectUsage = accounting.NewProjectUsage( peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, config.Rollup.MaxAlphaUsage, ) } { // setup orders log.Debug("Satellite API Process setting up orders endpoint") satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) peer.Orders.Endpoint = orders.NewEndpoint( peer.Log.Named("orders:endpoint"), satelliteSignee, peer.DB.Orders(), config.Orders.SettlementBatchSize, ) peer.Orders.Service = orders.NewService( peer.Log.Named("orders:service"), signing.SignerFromFullIdentity(peer.Identity), peer.Overlay.Service, peer.DB.Orders(), config.Orders.Expiration, &pb.NodeAddress{ Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: config.Contact.ExternalAddress, }, config.Repairer.MaxExcessRateOptimalThreshold, ) pb.RegisterOrdersServer(peer.Server.GRPC(), peer.Orders.Endpoint) pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint.DRPC()) } { // setup metainfo log.Debug("Satellite API Process setting up metainfo") peer.Metainfo.Database = pointerDB peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"), peer.Metainfo.Database, peer.DB.Buckets(), ) peer.Metainfo.Endpoint2 = metainfo.NewEndpoint( peer.Log.Named("metainfo:endpoint"), peer.Metainfo.Service, peer.Orders.Service, peer.Overlay.Service, peer.DB.Attribution(), peer.DB.PeerIdentities(), peer.DB.Console().APIKeys(), peer.Accounting.ProjectUsage, config.Metainfo.RS, signing.SignerFromFullIdentity(peer.Identity), config.Metainfo.MaxCommitInterval, ) pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2) pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2) } { // setup datarepair log.Debug("Satellite API Process setting up datarepair inspector") peer.Repair.Inspector = irreparable.NewInspector(peer.DB.Irreparable()) pb.RegisterIrreparableInspectorServer(peer.Server.PrivateGRPC(), peer.Repair.Inspector) pb.DRPCRegisterIrreparableInspector(peer.Server.PrivateDRPC(), peer.Repair.Inspector) } { // setup inspector log.Debug("Satellite API Process setting up inspector") peer.Inspector.Endpoint = inspector.NewEndpoint( peer.Log.Named("inspector"), peer.Overlay.Service, peer.Metainfo.Service, ) pb.RegisterHealthInspectorServer(peer.Server.PrivateGRPC(), peer.Inspector.Endpoint) pb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint) } { // setup mailservice log.Debug("Satellite API Process setting up mail service") // TODO(yar): test multiple satellites using same OAUTH credentials mailConfig := config.Mail // validate from mail address from, err := mail.ParseAddress(mailConfig.From) if err != nil { return nil, errs.Combine(err, peer.Close()) } // validate smtp server address host, _, err := net.SplitHostPort(mailConfig.SMTPServerAddress) if err != nil { return nil, errs.Combine(err, peer.Close()) } var sender mailservice.Sender switch mailConfig.AuthType { case "oauth2": creds := oauth2.Credentials{ ClientID: mailConfig.ClientID, ClientSecret: mailConfig.ClientSecret, TokenURI: mailConfig.TokenURI, } token, err := oauth2.RefreshToken(context.TODO(), creds, mailConfig.RefreshToken) if err != nil { return nil, errs.Combine(err, peer.Close()) } sender = &post.SMTPSender{ From: *from, Auth: &oauth2.Auth{ UserEmail: from.Address, Storage: oauth2.NewTokenStore(creds, *token), }, ServerAddress: mailConfig.SMTPServerAddress, } case "plain": sender = &post.SMTPSender{ From: *from, Auth: smtp.PlainAuth("", mailConfig.Login, mailConfig.Password, host), ServerAddress: mailConfig.SMTPServerAddress, } case "login": sender = &post.SMTPSender{ From: *from, Auth: post.LoginAuth{ Username: mailConfig.Login, Password: mailConfig.Password, }, ServerAddress: mailConfig.SMTPServerAddress, } default: sender = &simulate.LinkClicker{} } peer.Mail.Service, err = mailservice.New( peer.Log.Named("mail:service"), sender, mailConfig.TemplatePath, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } } { // setup payments config := paymentsconfig.Config{} switch config.Provider { default: peer.Payments.Accounts = mockpayments.Accounts() case "stripecoinpayments": service := stripecoinpayments.NewService( peer.Log.Named("stripecoinpayments service"), config.StripeCoinPayments, peer.DB.StripeCoinPayments(), peer.DB.Console().Projects()) peer.Payments.Accounts = service.Accounts() peer.Payments.Inspector = stripecoinpayments.NewEndpoint(service) pb.RegisterPaymentsServer(peer.Server.PrivateGRPC(), peer.Payments.Inspector) pb.DRPCRegisterPayments(peer.Server.PrivateDRPC(), peer.Payments.Inspector) } } { // setup marketing portal log.Debug("Satellite API Process setting up marketing server") peer.Marketing.PartnersService = rewards.NewPartnersService( peer.Log.Named("partners"), rewards.DefaultPartnersDB, []string{ "https://us-central-1.tardigrade.io/", "https://asia-east-1.tardigrade.io/", "https://europe-west-1.tardigrade.io/", }, ) peer.Marketing.Listener, err = net.Listen("tcp", config.Marketing.Address) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Marketing.Endpoint, err = marketingweb.NewServer( peer.Log.Named("marketing:endpoint"), config.Marketing, peer.DB.Rewards(), peer.Marketing.PartnersService, peer.Marketing.Listener, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } } { // setup console log.Debug("Satellite API Process setting up console") consoleConfig := config.Console peer.Console.Listener, err = net.Listen("tcp", consoleConfig.Address) if err != nil { return nil, errs.Combine(err, peer.Close()) } if consoleConfig.AuthTokenSecret == "" { return nil, errs.New("Auth token secret required") } peer.Console.Service, err = console.NewService( peer.Log.Named("console:service"), &consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)}, peer.DB.Console(), peer.DB.Rewards(), peer.Marketing.PartnersService, peer.Payments.Accounts, consoleConfig.PasswordCost, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Console.Endpoint = consoleweb.NewServer( peer.Log.Named("console:endpoint"), consoleConfig, peer.Console.Service, peer.Mail.Service, peer.Console.Listener, ) } { // setup node stats endpoint log.Debug("Satellite API Process setting up node stats endpoint") peer.NodeStats.Endpoint = nodestats.NewEndpoint( peer.Log.Named("nodestats:endpoint"), peer.Overlay.DB, peer.DB.StoragenodeAccounting(), ) pb.RegisterNodeStatsServer(peer.Server.GRPC(), peer.NodeStats.Endpoint) pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint) } { // setup graceful exit if config.GracefulExit.Enabled { log.Debug("Satellite API Process setting up graceful exit endpoint") peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint( peer.Log.Named("gracefulexit:endpoint"), signing.SignerFromFullIdentity(peer.Identity), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Overlay.Service, peer.Metainfo.Service, peer.Orders.Service, peer.DB.PeerIdentities(), config.GracefulExit) pb.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint) pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC()) } } return peer, nil } // Run runs satellite until it's either closed or it errors. func (peer *API) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) group, ctx := errgroup.WithContext(ctx) group.Go(func() error { return errs2.IgnoreCanceled(peer.Version.Run(ctx)) }) group.Go(func() error { // Don't change the format of this comment, it is used to figure out the node id. peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) return errs2.IgnoreCanceled(peer.Server.Run(ctx)) }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx)) }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx)) }) return group.Wait() } // Close closes all the resources. func (peer *API) Close() error { var errlist errs.Group // close servers, to avoid new connections to closing subsystems if peer.Server != nil { errlist.Add(peer.Server.Close()) } if peer.Marketing.Endpoint != nil { errlist.Add(peer.Marketing.Endpoint.Close()) } else if peer.Marketing.Listener != nil { errlist.Add(peer.Marketing.Listener.Close()) } if peer.Console.Endpoint != nil { errlist.Add(peer.Console.Endpoint.Close()) } else if peer.Console.Listener != nil { errlist.Add(peer.Console.Listener.Close()) } if peer.Mail.Service != nil { errlist.Add(peer.Mail.Service.Close()) } if peer.Metainfo.Endpoint2 != nil { errlist.Add(peer.Metainfo.Endpoint2.Close()) } if peer.Contact.Service != nil { errlist.Add(peer.Contact.Service.Close()) } if peer.Overlay.Service != nil { errlist.Add(peer.Overlay.Service.Close()) } return errlist.Err() } // ID returns the peer ID. func (peer *API) ID() storj.NodeID { return peer.Identity.ID } // Local returns the peer local node info. func (peer *API) Local() overlay.NodeDossier { return peer.Contact.Service.Local() } // Addr returns the public address. func (peer *API) Addr() string { return peer.Server.Addr().String() } // URL returns the storj.NodeURL. func (peer *API) URL() storj.NodeURL { return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} } // PrivateAddr returns the private address. func (peer *API) PrivateAddr() string { return peer.Server.PrivateAddr().String() }