// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package storagenode import ( "context" "errors" "net" "net/http" "time" "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/peertls/extensions" "storj.io/common/peertls/tlsopts" "storj.io/common/rpc" "storj.io/common/signing" "storj.io/common/storj" "storj.io/storj/pkg/debug" "storj.io/storj/pkg/server" "storj.io/storj/private/lifecycle" "storj.io/storj/private/version" "storj.io/storj/private/version/checker" "storj.io/storj/satellite/overlay" "storj.io/storj/storage" "storj.io/storj/storagenode/bandwidth" "storj.io/storj/storagenode/collector" "storj.io/storj/storagenode/console" "storj.io/storj/storagenode/console/consoleassets" "storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/contact" "storj.io/storj/storagenode/gracefulexit" "storj.io/storj/storagenode/inspector" "storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/nodestats" "storj.io/storj/storagenode/notifications" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/preflight" "storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/retain" "storj.io/storj/storagenode/satellites" "storj.io/storj/storagenode/storageusage" "storj.io/storj/storagenode/trust" ) var ( mon = monkit.Package() ) // DB is the master database for Storage Node // // architecture: Master Database type DB interface { // CreateTables initializes the database CreateTables(ctx context.Context) error // Close closes the database Close() error Pieces() storage.Blobs Orders() orders.DB V0PieceInfo() pieces.V0PieceInfoDB PieceExpirationDB() pieces.PieceExpirationDB PieceSpaceUsedDB() pieces.PieceSpaceUsedDB Bandwidth() bandwidth.DB UsedSerials() piecestore.UsedSerials Reputation() reputation.DB StorageUsage() storageusage.DB Satellites() satellites.DB Notifications() notifications.DB } // Config is all the configuration parameters for a Storage Node type Config struct { Identity identity.Config Server server.Config Debug debug.Config Preflight preflight.Config Contact contact.Config Operator OperatorConfig // TODO: flatten storage config and only keep the new one Storage piecestore.OldConfig Storage2 piecestore.Config Collector collector.Config Retain retain.Config Nodestats nodestats.Config Console consoleserver.Config Version checker.Config Bandwidth bandwidth.Config GracefulExit gracefulexit.Config } // Verify verifies whether configuration is consistent and acceptable. func (config *Config) Verify(log *zap.Logger) error { err := config.Operator.Verify(log) if err != nil { return err } if config.Contact.ExternalAddress != "" { err := isAddressValid(config.Contact.ExternalAddress) if err != nil { return errs.New("invalid contact.external-address: %v", err) } } if config.Server.Address != "" { err := isAddressValid(config.Server.Address) if err != nil { return errs.New("invalid server.address: %v", err) } } return nil } func isAddressValid(addrstring string) error { addr, port, err := net.SplitHostPort(addrstring) if err != nil || port == "" { return errs.New("split host-port %q failed: %+v", addrstring, err) } if addr == "" { return nil } resolvedhosts, err := net.LookupHost(addr) if err != nil || len(resolvedhosts) == 0 { return errs.New("lookup %q failed: %+v", addr, err) } return nil } // Peer is the representation of a Storage Node. // // architecture: Peer type Peer struct { // core dependencies Log *zap.Logger Identity *identity.FullIdentity DB DB Servers *lifecycle.Group Services *lifecycle.Group Dialer rpc.Dialer Server *server.Server Version *checker.Service Debug struct { Listener net.Listener Server *debug.Server } // services and endpoints // TODO: similar grouping to satellite.Core Preflight struct { LocalTime *preflight.LocalTime } Contact struct { Service *contact.Service Chore *contact.Chore Endpoint *contact.Endpoint PingStats *contact.PingStats } Storage2 struct { // TODO: lift things outside of it to organize better Trust *trust.Pool Store *pieces.Store TrashChore *pieces.TrashChore BlobsCache *pieces.BlobsUsageCache CacheService *pieces.CacheService RetainService *retain.Service Endpoint *piecestore.Endpoint Inspector *inspector.Endpoint Monitor *monitor.Service Orders *orders.Service } Collector *collector.Service NodeStats struct { Service *nodestats.Service Cache *nodestats.Cache } // Web server with web UI Console struct { Listener net.Listener Service *console.Service Endpoint *consoleserver.Server } GracefulExit struct { Endpoint *gracefulexit.Endpoint Chore *gracefulexit.Chore } Notifications struct { Service *notifications.Service } Bandwidth *bandwidth.Service } // New creates a new Storage Node. func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB extensions.RevocationDB, config Config, versionInfo version.Info) (*Peer, error) { peer := &Peer{ Log: log, Identity: full, DB: db, Servers: lifecycle.NewGroup(log.Named("servers")), Services: lifecycle.NewGroup(log.Named("services")), } { // setup debug var err error if config.Debug.Address != "" { peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address) if err != nil { withoutStack := errors.New(err.Error()) peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack)) err = nil } } debugConfig := config.Debug debugConfig.ControlTitle = "Storage Node" peer.Debug.Server = debug.NewServer(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig) peer.Servers.Add(lifecycle.Item{ Name: "debug", Run: peer.Debug.Server.Run, Close: peer.Debug.Server.Close, }) } var err error { if !versionInfo.IsZero() { peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode") peer.Services.Add(lifecycle.Item{ Name: "version", Run: peer.Version.Run, }) } { // setup listener and server sc := config.Server tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Dialer = rpc.NewDefaultDialer(tlsOptions) peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, nil) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Servers.Add(lifecycle.Item{ Name: "server", Run: func(ctx context.Context) error { peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) return peer.Server.Run(ctx) }, Close: peer.Server.Close, }) } { // setup trust pool peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Services.Add(lifecycle.Item{ Name: "trust", Run: peer.Storage2.Trust.Run, }) } { peer.Preflight.LocalTime = preflight.NewLocalTime(peer.Log.Named("preflight:localtime"), config.Preflight, peer.Storage2.Trust, peer.Dialer) } { // setup notification service. peer.Notifications.Service = notifications.NewService(peer.Log, peer.DB.Notifications()) } { // setup contact service c := config.Contact if c.ExternalAddress == "" { c.ExternalAddress = peer.Addr() } pbVersion, err := versionInfo.Proto() if err != nil { return nil, errs.Combine(err, peer.Close()) } self := &overlay.NodeDossier{ Node: pb.Node{ Id: peer.ID(), Address: &pb.NodeAddress{ Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: c.ExternalAddress, }, }, Type: pb.NodeType_STORAGE, Operator: pb.NodeOperator{ Email: config.Operator.Email, Wallet: config.Operator.Wallet, }, Version: *pbVersion, } peer.Contact.PingStats = new(contact.PingStats) peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self) peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service) peer.Services.Add(lifecycle.Item{ Name: "contact:chore", Run: peer.Contact.Chore.Run, Close: peer.Contact.Chore.Close, }) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats) pb.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint) pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint) } { // setup storage peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces()) peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"), peer.Storage2.BlobsCache, peer.DB.V0PieceInfo(), peer.DB.PieceExpirationDB(), peer.DB.PieceSpaceUsedDB(), ) peer.Storage2.TrashChore = pieces.NewTrashChore( log.Named("pieces:trash"), 24*time.Hour, // choreInterval: how often to run the chore 7*24*time.Hour, // trashExpiryInterval: when items in the trash should be deleted peer.Storage2.Trust, peer.Storage2.Store, ) peer.Services.Add(lifecycle.Item{ Name: "pieces:trash", Run: peer.Storage2.TrashChore.Run, Close: peer.Storage2.TrashChore.Close, }) peer.Storage2.CacheService = pieces.NewService( log.Named("piecestore:cache"), peer.Storage2.BlobsCache, peer.Storage2.Store, config.Storage2.CacheSyncInterval, ) peer.Services.Add(lifecycle.Item{ Name: "piecestore:cache", Run: peer.Storage2.CacheService.Run, Close: peer.Storage2.CacheService.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Piecestore Cache", peer.Storage2.CacheService.Loop)) peer.Storage2.Monitor = monitor.NewService( log.Named("piecestore:monitor"), peer.Storage2.Store, peer.Contact.Service, peer.DB.Bandwidth(), config.Storage.AllocatedDiskSpace.Int64(), config.Storage.AllocatedBandwidth.Int64(), //TODO use config.Storage.Monitor.Interval, but for some reason is not set config.Storage.KBucketRefreshInterval, config.Storage2.Monitor, ) peer.Services.Add(lifecycle.Item{ Name: "piecestore:monitor", Run: peer.Storage2.Monitor.Run, Close: peer.Storage2.Monitor.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Piecestore Monitor", peer.Storage2.Monitor.Loop)) peer.Storage2.RetainService = retain.NewService( peer.Log.Named("retain"), peer.Storage2.Store, config.Retain, ) peer.Services.Add(lifecycle.Item{ Name: "retain", Run: peer.Storage2.RetainService.Run, Close: peer.Storage2.RetainService.Close, }) peer.Storage2.Endpoint, err = piecestore.NewEndpoint( peer.Log.Named("piecestore"), signing.SignerFromFullIdentity(peer.Identity), peer.Storage2.Trust, peer.Storage2.Monitor, peer.Storage2.RetainService, peer.Contact.PingStats, peer.Storage2.Store, peer.DB.Orders(), peer.DB.Bandwidth(), peer.DB.UsedSerials(), peer.Contact.Chore.Trigger, config.Storage2, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } // TODO remove this once workgroup is removed from piecestore endpoint peer.Services.Add(lifecycle.Item{ Name: "piecestore", Close: peer.Storage2.Endpoint.Close, }) pb.RegisterPiecestoreServer(peer.Server.GRPC(), peer.Storage2.Endpoint) pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint.DRPC()) // TODO workaround for custom timeout for order sending request (read/write) sc := config.Server tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) if err != nil { return nil, errs.Combine(err, peer.Close()) } dialer := rpc.NewDefaultDialer(tlsOptions) dialer.DialTimeout = config.Storage2.Orders.SenderDialTimeout peer.Storage2.Orders = orders.NewService( log.Named("orders"), dialer, peer.DB.Orders(), peer.Storage2.Trust, config.Storage2.Orders, ) peer.Services.Add(lifecycle.Item{ Name: "orders", Run: peer.Storage2.Orders.Run, Close: peer.Storage2.Orders.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Orders Sender", peer.Storage2.Orders.Sender)) peer.Debug.Server.Panel.Add( debug.Cycle("Orders Cleanup", peer.Storage2.Orders.Cleanup)) } { // setup node stats service peer.NodeStats.Service = nodestats.NewService( peer.Log.Named("nodestats:service"), peer.Dialer, peer.Storage2.Trust, ) peer.NodeStats.Cache = nodestats.NewCache( peer.Log.Named("nodestats:cache"), config.Nodestats, nodestats.CacheStorage{ Reputation: peer.DB.Reputation(), StorageUsage: peer.DB.StorageUsage(), }, peer.NodeStats.Service, peer.Storage2.Trust, ) peer.Services.Add(lifecycle.Item{ Name: "nodestats:cache", Run: peer.NodeStats.Cache.Run, Close: peer.NodeStats.Cache.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Node Stats Cache Reputation", peer.NodeStats.Cache.Reputation)) peer.Debug.Server.Panel.Add( debug.Cycle("Node Stats Cache Storage", peer.NodeStats.Cache.Storage)) } { // setup storage node operator dashboard peer.Console.Service, err = console.NewService( peer.Log.Named("console:service"), peer.DB.Bandwidth(), peer.Storage2.Store, peer.Version, config.Storage.AllocatedBandwidth, config.Storage.AllocatedDiskSpace, config.Operator.Wallet, versionInfo, peer.Storage2.Trust, peer.DB.Reputation(), peer.DB.StorageUsage(), peer.Contact.PingStats, peer.Contact.Service, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } peer.Console.Listener, err = net.Listen("tcp", config.Console.Address) if err != nil { return nil, errs.Combine(err, peer.Close()) } assets := consoleassets.FileSystem if config.Console.StaticDir != "" { // a specific directory has been configured. use it assets = http.Dir(config.Console.StaticDir) } peer.Console.Endpoint = consoleserver.NewServer( peer.Log.Named("console:endpoint"), assets, peer.Notifications.Service, peer.Console.Service, peer.Console.Listener, ) peer.Services.Add(lifecycle.Item{ Name: "console:endpoint", Run: peer.Console.Endpoint.Run, Close: peer.Console.Endpoint.Close, }) } { // setup storage inspector peer.Storage2.Inspector = inspector.NewEndpoint( peer.Log.Named("pieces:inspector"), peer.Storage2.Store, peer.Contact.Service, peer.Contact.PingStats, peer.DB.Bandwidth(), config.Storage, peer.Console.Listener.Addr(), config.Contact.ExternalAddress, ) pb.RegisterPieceStoreInspectorServer(peer.Server.PrivateGRPC(), peer.Storage2.Inspector) pb.DRPCRegisterPieceStoreInspector(peer.Server.PrivateDRPC(), peer.Storage2.Inspector) } { // setup graceful exit service peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint( peer.Log.Named("gracefulexit:endpoint"), peer.Storage2.Trust, peer.DB.Satellites(), peer.Storage2.BlobsCache, ) pb.RegisterNodeGracefulExitServer(peer.Server.PrivateGRPC(), peer.GracefulExit.Endpoint) pb.DRPCRegisterNodeGracefulExit(peer.Server.PrivateDRPC(), peer.GracefulExit.Endpoint) peer.GracefulExit.Chore = gracefulexit.NewChore( peer.Log.Named("gracefulexit:chore"), config.GracefulExit, peer.Storage2.Store, peer.Storage2.Trust, peer.Dialer, peer.DB.Satellites(), ) peer.Services.Add(lifecycle.Item{ Name: "gracefulexit:chore", Run: peer.GracefulExit.Chore.Run, Close: peer.GracefulExit.Chore.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) } peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector) peer.Services.Add(lifecycle.Item{ Name: "collector", Run: peer.Collector.Run, Close: peer.Collector.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Collector", peer.Collector.Loop)) peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth) peer.Services.Add(lifecycle.Item{ Name: "bandwidth", Run: peer.Bandwidth.Run, Close: peer.Bandwidth.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Bandwidth", peer.Bandwidth.Loop)) return peer, nil } // Run runs storage node until it's either closed or it errors. func (peer *Peer) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) // Refresh the trust pool first. It will be updated periodically via // Run() below. if err := peer.Storage2.Trust.Refresh(ctx); err != nil { return err } if err := peer.Preflight.LocalTime.Check(ctx); err != nil { peer.Log.Fatal("failed preflight check", zap.Error(err)) return err } group, ctx := errgroup.WithContext(ctx) peer.Servers.Run(ctx, group) peer.Services.Run(ctx, group) return group.Wait() } // Close closes all the resources. func (peer *Peer) Close() error { return errs.Combine( peer.Servers.Close(), peer.Services.Close(), ) } // ID returns the peer ID. func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID } // Local returns the peer local node info. func (peer *Peer) Local() overlay.NodeDossier { return peer.Contact.Service.Local() } // Addr returns the public address. func (peer *Peer) Addr() string { return peer.Server.Addr().String() } // URL returns the storj.NodeURL. func (peer *Peer) URL() storj.NodeURL { return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} } // PrivateAddr returns the private address. func (peer *Peer) PrivateAddr() string { return peer.Server.PrivateAddr().String() }