diff --git a/private/lifecycle/group.go b/private/lifecycle/group.go new file mode 100644 index 000000000..5f81fed66 --- /dev/null +++ b/private/lifecycle/group.go @@ -0,0 +1,76 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +// Package lifecycle allows controlling group of items. +package lifecycle + +import ( + "context" + + "github.com/zeebo/errs" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/common/errs2" +) + +var mon = monkit.Package() + +// Group implements a collection of items that have a +// concurrent start and are closed in reverse order. +type Group struct { + log *zap.Logger + items []Item +} + +// Item is the lifecycle item that group runs and closes. +type Item struct { + Name string + Run func(ctx context.Context) error + Close func() error +} + +// NewGroup creates a new group. +func NewGroup(log *zap.Logger) *Group { + return &Group{log: log} +} + +// Add adds item to the group. +func (group *Group) Add(item Item) { + group.items = append(group.items, item) +} + +// Run starts all items concurrently under group g. +func (group *Group) Run(ctx context.Context, g *errgroup.Group) { + defer mon.Task()(&ctx)(nil) + + var started []string + for _, item := range group.items { + item := item + started = append(started, item.Name) + if item.Run == nil { + continue + } + g.Go(func() error { + return errs2.IgnoreCanceled(item.Run(ctx)) + }) + } + + group.log.Debug("started", zap.Strings("items", started)) +} + +// Close closes all items in reverse order. +func (group *Group) Close() error { + var errlist errs.Group + + for i := len(group.items) - 1; i >= 0; i-- { + item := group.items[i] + if item.Close == nil { + continue + } + errlist.Add(item.Close()) + } + + return errlist.Err() +} diff --git a/private/lifecycle/group_test.go b/private/lifecycle/group_test.go new file mode 100644 index 000000000..2530e09d9 --- /dev/null +++ b/private/lifecycle/group_test.go @@ -0,0 +1,71 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package lifecycle_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "golang.org/x/sync/errgroup" + + "storj.io/common/testcontext" + "storj.io/storj/private/lifecycle" +) + +func TestGroup(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + log := zaptest.NewLogger(t) + + closed := []string{} + var astart, cstart bool + + group := lifecycle.NewGroup(log) + group.Add(lifecycle.Item{ + Name: "A", + Run: func(ctx context.Context) error { + astart = true + log.Info("Run A") + return nil + }, + Close: func() error { + closed = append(closed, "A") + return nil + }, + }) + group.Add(lifecycle.Item{ + Name: "B", + Run: nil, + Close: func() error { + closed = append(closed, "B") + return nil + }, + }) + group.Add(lifecycle.Item{ + Name: "C", + Run: func(ctx context.Context) error { + cstart = true + log.Info("Run C") + return nil + }, + Close: nil, + }) + + g, gctx := errgroup.WithContext(ctx) + group.Run(gctx, g) + + err := g.Wait() + require.NoError(t, err) + + require.True(t, astart) + require.True(t, cstart) + + err = group.Close() + require.NoError(t, err) + + require.Equal(t, []string{"B", "A"}, closed) +} diff --git a/satellite/api.go b/satellite/api.go index 1b4762172..93332a607 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -13,7 +13,6 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" - "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/peertls/extensions" @@ -23,6 +22,7 @@ import ( "storj.io/common/storj" "storj.io/storj/pkg/auth/grpcauth" "storj.io/storj/pkg/server" + "storj.io/storj/private/lifecycle" "storj.io/storj/private/post" "storj.io/storj/private/post/oauth2" "storj.io/storj/private/version" @@ -61,6 +61,9 @@ type API struct { Identity *identity.FullIdentity DB DB + Servers *lifecycle.Group + Services *lifecycle.Group + Dialer rpc.Dialer Server *server.Server Version *checker.Service @@ -154,6 +157,9 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, Log: log, Identity: full, DB: db, + + Servers: lifecycle.NewGroup(log.Named("servers")), + Services: lifecycle.NewGroup(log.Named("services")), } var err error @@ -164,6 +170,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") + + peer.Services.Add(lifecycle.Item{ + Name: "version", + Run: peer.Version.Run, + }) } { // setup listener and server @@ -184,11 +195,29 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, if err != nil { return nil, errs.Combine(err, peer.Close()) } + + peer.Servers.Add(lifecycle.Item{ + Name: "server", + Run: func(ctx context.Context) error { + // Don't change the format of this comment, it is used to figure out the node id. + peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) + peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) + peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) + return peer.Server.Run(ctx) + }, + Close: peer.Server.Close, + }) } { // setup overlay peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache()) + peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) + peer.Services.Add(lifecycle.Item{ + Name: "overlay", + Close: peer.Overlay.Service.Close, + }) + peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service) pb.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector) pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector) @@ -219,6 +248,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint) + + peer.Services.Add(lifecycle.Item{ + Name: "contact:service", + Close: peer.Contact.Service.Close, + }) } { // setup vouchers @@ -240,7 +274,13 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup orders peer.Orders.DB = rollupsWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) + peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) + peer.Services.Add(lifecycle.Item{ + Name: "orders:chore", + Run: peer.Orders.Chore.Run, + Close: peer.Orders.Chore.Close, + }) + satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) peer.Orders.Endpoint = orders.NewEndpoint( peer.Log.Named("orders:endpoint"), @@ -291,6 +331,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, if err != nil { return nil, errs.Combine(err, peer.Close()) } + + peer.Servers.Add(lifecycle.Item{ + Name: "marketing:endpoint", + Run: peer.Marketing.Endpoint.Run, + Close: peer.Marketing.Endpoint.Close, + }) } { // setup metainfo @@ -301,13 +347,17 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, ) peer.Metainfo.DeletePiecesService, err = metainfo.NewDeletePiecesService( - peer.Log.Named("metainfo:DeletePiecesService"), + peer.Log.Named("metainfo:delete-pieces"), peer.Dialer, metainfoDeletePiecesConcurrencyLimit, ) if err != nil { return nil, errs.Combine(err, peer.Close()) } + peer.Services.Add(lifecycle.Item{ + Name: "metainfo:delete-pieces", + Close: peer.Metainfo.DeletePiecesService.Close, + }) peer.Metainfo.Endpoint2 = metainfo.NewEndpoint( peer.Log.Named("metainfo:endpoint"), @@ -328,6 +378,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, ) pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2) pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2) + + peer.Services.Add(lifecycle.Item{ + Name: "metainfo:endpoint", + Close: peer.Metainfo.Endpoint2.Close, + }) } { // setup datarepair @@ -410,6 +465,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, if err != nil { return nil, errs.Combine(err, peer.Close()) } + + peer.Services.Add(lifecycle.Item{ + Name: "mail:service", + Close: peer.Mail.Service.Close, + }) } { // setup payments @@ -433,12 +493,18 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Payments.Inspector = stripecoinpayments.NewEndpoint(service) peer.Payments.Version = stripecoinpayments.NewVersionService( - peer.Log.Named("payments.stripe:service"), + peer.Log.Named("payments.stripe:version"), service, pc.StripeCoinPayments.ConversionRatesCycleInterval) pb.RegisterPaymentsServer(peer.Server.PrivateGRPC(), peer.Payments.Inspector) pb.DRPCRegisterPayments(peer.Server.PrivateDRPC(), peer.Payments.Inspector) + + peer.Services.Add(lifecycle.Item{ + Name: "payments.stripe:version", + Run: peer.Payments.Version.Run, + Close: peer.Payments.Version.Close, + }) } } @@ -485,6 +551,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Console.Listener, config.Payments.StripeCoinPayments.StripePublicKey, ) + + peer.Servers.Add(lifecycle.Item{ + Name: "console:endpoint", + Run: peer.Console.Endpoint.Run, + Close: peer.Console.Endpoint.Close, + }) } { // setup node stats endpoint @@ -526,74 +598,18 @@ func (peer *API) Run(ctx context.Context) (err error) { group, ctx := errgroup.WithContext(ctx) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Version.Run(ctx)) - }) - group.Go(func() error { - // Don't change the format of this comment, it is used to figure out the node id. - peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) - peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) - peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) - return errs2.IgnoreCanceled(peer.Server.Run(ctx)) - }) - if peer.Payments.Version != nil { - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Payments.Version.Run(ctx)) - }) - } - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx)) - }) + peer.Servers.Run(ctx, group) + peer.Services.Run(ctx, group) return group.Wait() } // Close closes all the resources. func (peer *API) Close() error { - var errlist errs.Group - - // close servers, to avoid new connections to closing subsystems - if peer.Server != nil { - errlist.Add(peer.Server.Close()) - } - if peer.Marketing.Endpoint != nil { - errlist.Add(peer.Marketing.Endpoint.Close()) - } else if peer.Marketing.Listener != nil { - errlist.Add(peer.Marketing.Listener.Close()) - } - if peer.Console.Endpoint != nil { - errlist.Add(peer.Console.Endpoint.Close()) - } else if peer.Console.Listener != nil { - errlist.Add(peer.Console.Listener.Close()) - } - if peer.Mail.Service != nil { - errlist.Add(peer.Mail.Service.Close()) - } - if peer.Payments.Version != nil { - errlist.Add(peer.Payments.Version.Close()) - } - if peer.Metainfo.Endpoint2 != nil { - errlist.Add(peer.Metainfo.Endpoint2.Close()) - } - if peer.Contact.Service != nil { - errlist.Add(peer.Contact.Service.Close()) - } - if peer.Overlay.Service != nil { - errlist.Add(peer.Overlay.Service.Close()) - } - if peer.Orders.Chore.Loop != nil { - errlist.Add(peer.Orders.Chore.Close()) - } - if peer.Metainfo.DeletePiecesService != nil { - errlist.Add(peer.Metainfo.DeletePiecesService.Close()) - } - return errlist.Err() + return errs.Combine( + peer.Servers.Close(), + peer.Services.Close(), + ) } // ID returns the peer ID. diff --git a/satellite/core.go b/satellite/core.go index 7fcf23bb5..aca5d4588 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -10,7 +10,6 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" - "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/peertls/extensions" @@ -18,6 +17,7 @@ import ( "storj.io/common/rpc" "storj.io/common/signing" "storj.io/common/storj" + "storj.io/storj/private/lifecycle" "storj.io/storj/private/version" version_checker "storj.io/storj/private/version/checker" "storj.io/storj/satellite/accounting" @@ -50,6 +50,9 @@ type Core struct { Identity *identity.FullIdentity DB DB + Servers *lifecycle.Group + Services *lifecycle.Group + Dialer rpc.Dialer Version *version_checker.Service @@ -136,6 +139,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, Log: log, Identity: full, DB: db, + + Servers: lifecycle.NewGroup(log.Named("servers")), + Services: lifecycle.NewGroup(log.Named("services")), } var err error @@ -146,6 +152,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") + + peer.Services.Add(lifecycle.Item{ + Name: "version", + Run: peer.Version.Run, + }) } { // setup listener and server @@ -177,11 +188,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, Version: *pbVersion, } peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) + peer.Services.Add(lifecycle.Item{ + Name: "contact:service", + Close: peer.Contact.Service.Close, + }) } { // setup overlay peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache()) peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) + peer.Services.Add(lifecycle.Item{ + Name: "overlay", + Close: peer.Overlay.Service.Close, + }) } { // setup live accounting @@ -198,7 +217,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup orders peer.Orders.DB = rollupsWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) + peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) + peer.Services.Add(lifecycle.Item{ + Name: "overlay", + Run: peer.Orders.Chore.Run, + Close: peer.Orders.Chore.Close, + }) peer.Orders.Service = orders.NewService( peer.Log.Named("orders:service"), signing.SignerFromFullIdentity(peer.Identity), @@ -221,21 +245,31 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.DB.Buckets(), ) peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database) + peer.Services.Add(lifecycle.Item{ + Name: "metainfo:loop", + Run: peer.Metainfo.Loop.Run, + Close: peer.Metainfo.Loop.Close, + }) } { // setup datarepair // TODO: simplify argument list somehow peer.Repair.Checker = checker.NewChecker( - peer.Log.Named("checker"), + peer.Log.Named("repair:checker"), peer.DB.RepairQueue(), peer.DB.Irreparable(), peer.Metainfo.Service, peer.Metainfo.Loop, peer.Overlay.Service, config.Checker) + peer.Services.Add(lifecycle.Item{ + Name: "repair:checker", + Run: peer.Repair.Checker.Run, + Close: peer.Repair.Checker.Close, + }) segmentRepairer := repairer.NewSegmentRepairer( - log.Named("repairer"), + log.Named("segment-repair"), peer.Metainfo.Service, peer.Orders.Service, peer.Overlay.Service, @@ -248,11 +282,16 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, ) peer.Repair.Repairer = repairer.NewService( - peer.Log.Named("repairer"), + peer.Log.Named("repair"), peer.DB.RepairQueue(), &config.Repairer, segmentRepairer, ) + peer.Services.Add(lifecycle.Item{ + Name: "repair", + Run: peer.Repair.Repairer.Run, + Close: peer.Repair.Repairer.Close, + }) } { // setup audit @@ -278,42 +317,78 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, int32(config.MaxReverifyCount), ) - peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit worker"), + peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"), peer.Audit.Queue, peer.Audit.Verifier, peer.Audit.Reporter, config, ) + peer.Services.Add(lifecycle.Item{ + Name: "audit:worker", + Run: peer.Audit.Worker.Run, + Close: peer.Audit.Worker.Close, + }) if err != nil { return nil, errs.Combine(err, peer.Close()) } - peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit chore"), + peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"), peer.Audit.Queue, peer.Metainfo.Loop, config, ) + peer.Services.Add(lifecycle.Item{ + Name: "audit:chore", + Run: peer.Audit.Chore.Run, + Close: peer.Audit.Chore.Close, + }) } { // setup garbage collection peer.GarbageCollection.Service = gc.NewService( - peer.Log.Named("garbage collection"), + peer.Log.Named("garbage-collection"), config.GarbageCollection, peer.Dialer, peer.Overlay.DB, peer.Metainfo.Loop, ) + peer.Services.Add(lifecycle.Item{ + Name: "garbage-collection", + Run: peer.GarbageCollection.Service.Run, + }) } { // setup db cleanup peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup) + peer.Services.Add(lifecycle.Item{ + Name: "dbcleanup", + Run: peer.DBCleanup.Chore.Run, + Close: peer.DBCleanup.Chore.Close, + }) } { // setup accounting - peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval) - peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) - peer.Accounting.ReportedRollupChore = reportedrollup.NewChore(peer.Log.Named("reportedrollup"), peer.DB.Orders(), config.ReportedRollup) + peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval) + peer.Services.Add(lifecycle.Item{ + Name: "accounting:tally", + Run: peer.Accounting.Tally.Run, + Close: peer.Accounting.Tally.Close, + }) + + peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) + peer.Services.Add(lifecycle.Item{ + Name: "accounting:rollup", + Run: peer.Accounting.Rollup.Run, + Close: peer.Accounting.Rollup.Close, + }) + + peer.Accounting.ReportedRollupChore = reportedrollup.NewChore(peer.Log.Named("accounting:reported-rollup"), peer.DB.Orders(), config.ReportedRollup) + peer.Services.Add(lifecycle.Item{ + Name: "accounting:reported-rollup", + Run: peer.Accounting.ReportedRollupChore.Run, + Close: peer.Accounting.ReportedRollupChore.Close, + }) } // TODO: remove in future, should be in API @@ -342,12 +417,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pc.StripeCoinPayments.TransactionUpdateInterval, pc.StripeCoinPayments.AccountBalanceUpdateInterval, ) + peer.Services.Add(lifecycle.Item{ + Name: "payments.stripe:service", + Run: peer.Payments.Chore.Run, + }) } } { // setup graceful exit if config.GracefulExit.Enabled { peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit) + peer.Services.Add(lifecycle.Item{ + Name: "gracefulexit", + Run: peer.GracefulExit.Chore.Run, + Close: peer.GracefulExit.Chore.Close, + }) } else { peer.Log.Named("gracefulexit").Info("disabled") } @@ -359,6 +443,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config.Metrics, peer.Metainfo.Loop, ) + peer.Services.Add(lifecycle.Item{ + Name: "metrics", + Run: peer.Metrics.Chore.Run, + Close: peer.Metrics.Chore.Close, + }) } { // setup downtime tracking @@ -371,6 +460,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.DowntimeTracking.Service, peer.DB.DowntimeTracking(), ) + peer.Services.Add(lifecycle.Item{ + Name: "downtime:detection", + Run: peer.DowntimeTracking.DetectionChore.Run, + Close: peer.DowntimeTracking.DetectionChore.Close, + }) peer.DowntimeTracking.EstimationChore = downtime.NewEstimationChore( peer.Log.Named("downtime:estimation"), @@ -379,6 +473,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.DowntimeTracking.Service, peer.DB.DowntimeTracking(), ) + peer.Services.Add(lifecycle.Item{ + Name: "downtime:estimation", + Run: peer.DowntimeTracking.EstimationChore.Run, + Close: peer.DowntimeTracking.EstimationChore.Close, + }) } return peer, nil @@ -390,133 +489,18 @@ func (peer *Core) Run(ctx context.Context) (err error) { group, ctx := errgroup.WithContext(ctx) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Metainfo.Loop.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Version.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Accounting.ReportedRollupChore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Audit.Worker.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Audit.Chore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx)) - }) - if peer.GracefulExit.Chore != nil { - group.Go(func() error { - return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx)) - }) - } - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx)) - }) - if peer.Payments.Chore != nil { - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Payments.Chore.Run(ctx)) - }) - } - group.Go(func() error { - return errs2.IgnoreCanceled(peer.DowntimeTracking.DetectionChore.Run(ctx)) - }) - - group.Go(func() error { - return errs2.IgnoreCanceled(peer.DowntimeTracking.EstimationChore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx)) - }) + peer.Servers.Run(ctx, group) + peer.Services.Run(ctx, group) return group.Wait() } // Close closes all the resources. func (peer *Core) Close() error { - var errlist errs.Group - - // TODO: ensure that Close can be called on nil-s that way this code won't need the checks. - - if peer.Orders.Chore != nil { - errlist.Add(peer.Orders.Chore.Close()) - } - - // close servers, to avoid new connections to closing subsystems - if peer.DowntimeTracking.EstimationChore != nil { - errlist.Add(peer.DowntimeTracking.EstimationChore.Close()) - } - - if peer.DowntimeTracking.DetectionChore != nil { - errlist.Add(peer.DowntimeTracking.DetectionChore.Close()) - } - - if peer.Metrics.Chore != nil { - errlist.Add(peer.Metrics.Chore.Close()) - } - - if peer.GracefulExit.Chore != nil { - errlist.Add(peer.GracefulExit.Chore.Close()) - } - - // close services in reverse initialization order - - if peer.Audit.Chore != nil { - errlist.Add(peer.Audit.Chore.Close()) - } - if peer.Audit.Worker != nil { - errlist.Add(peer.Audit.Worker.Close()) - } - - if peer.Accounting.Rollup != nil { - errlist.Add(peer.Accounting.ReportedRollupChore.Close()) - } - if peer.Accounting.Rollup != nil { - errlist.Add(peer.Accounting.Rollup.Close()) - } - if peer.Accounting.Tally != nil { - errlist.Add(peer.Accounting.Tally.Close()) - } - - if peer.DBCleanup.Chore != nil { - errlist.Add(peer.DBCleanup.Chore.Close()) - } - if peer.Repair.Repairer != nil { - errlist.Add(peer.Repair.Repairer.Close()) - } - if peer.Repair.Checker != nil { - errlist.Add(peer.Repair.Checker.Close()) - } - - if peer.Overlay.Service != nil { - errlist.Add(peer.Overlay.Service.Close()) - } - if peer.Contact.Service != nil { - errlist.Add(peer.Contact.Service.Close()) - } - if peer.Metainfo.Loop != nil { - errlist.Add(peer.Metainfo.Loop.Close()) - } - - return errlist.Err() + return errs.Combine( + peer.Servers.Close(), + peer.Services.Close(), + ) } // ID returns the peer ID. diff --git a/satellite/repairer.go b/satellite/repairer.go index 8ac2794e1..79b12cb3f 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -10,7 +10,6 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" - "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/peertls/extensions" @@ -18,6 +17,7 @@ import ( "storj.io/common/rpc" "storj.io/common/signing" "storj.io/common/storj" + "storj.io/storj/private/lifecycle" "storj.io/storj/private/version" version_checker "storj.io/storj/private/version/checker" "storj.io/storj/satellite/metainfo" @@ -34,6 +34,9 @@ type Repairer struct { Log *zap.Logger Identity *identity.FullIdentity + Servers *lifecycle.Group + Services *lifecycle.Group + Dialer rpc.Dialer Version *version_checker.Service @@ -58,6 +61,9 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, peer := &Repairer{ Log: log, Identity: full, + + Servers: lifecycle.NewGroup(log.Named("servers")), + Services: lifecycle.NewGroup(log.Named("services")), } { @@ -66,6 +72,11 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") + + peer.Services.Add(lifecycle.Item{ + Name: "version", + Run: peer.Version.Run, + }) } { // setup dialer @@ -85,11 +96,20 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, { // setup overlay peer.Overlay = overlay.NewService(log.Named("overlay"), overlayCache, config.Overlay) + peer.Services.Add(lifecycle.Item{ + Name: "overlay", + Close: peer.Overlay.Close, + }) } { // setup orders peer.Orders.DB = rollupsWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) + peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) + peer.Services.Add(lifecycle.Item{ + Name: "orders:chore", + Run: peer.Orders.Chore.Run, + Close: peer.Orders.Chore.Close, + }) peer.Orders.Service = orders.NewService( log.Named("orders"), signing.SignerFromFullIdentity(peer.Identity), @@ -119,6 +139,12 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()), ) peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer) + + peer.Services.Add(lifecycle.Item{ + Name: "repair", + Run: peer.Repairer.Run, + Close: peer.Repairer.Close, + }) } return peer, nil @@ -130,37 +156,18 @@ func (peer *Repairer) Run(ctx context.Context) (err error) { group, ctx := errgroup.WithContext(ctx) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Version.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx)) - }) - group.Go(func() error { - peer.Log.Info("Repairer started") - return errs2.IgnoreCanceled(peer.Repairer.Run(ctx)) - }) + peer.Servers.Run(ctx, group) + peer.Services.Run(ctx, group) return group.Wait() } // Close closes all the resources. func (peer *Repairer) Close() error { - var errlist errs.Group - - // close services in reverse initialization order - - if peer.Overlay != nil { - errlist.Add(peer.Overlay.Close()) - } - if peer.Repairer != nil { - errlist.Add(peer.Repairer.Close()) - } - if peer.Orders.Chore != nil { - errlist.Add(peer.Orders.Chore.Close()) - } - - return errlist.Err() + return errs.Combine( + peer.Servers.Close(), + peer.Services.Close(), + ) } // ID returns the peer ID. diff --git a/storagenode/peer.go b/storagenode/peer.go index d41572a3c..2ddba51e0 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -14,7 +14,6 @@ import ( "golang.org/x/sync/errgroup" "gopkg.in/spacemonkeygo/monkit.v2" - "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/peertls/extensions" @@ -23,6 +22,7 @@ import ( "storj.io/common/signing" "storj.io/common/storj" "storj.io/storj/pkg/server" + "storj.io/storj/private/lifecycle" "storj.io/storj/private/version" "storj.io/storj/private/version/checker" "storj.io/storj/satellite/overlay" @@ -153,6 +153,9 @@ type Peer struct { Identity *identity.FullIdentity DB DB + Servers *lifecycle.Group + Services *lifecycle.Group + Dialer rpc.Dialer Server *server.Server @@ -219,6 +222,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten Log: log, Identity: full, DB: db, + + Servers: lifecycle.NewGroup(log.Named("servers")), + Services: lifecycle.NewGroup(log.Named("services")), } var err error @@ -229,6 +235,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode") + + peer.Services.Add(lifecycle.Item{ + Name: "version", + Run: peer.Version.Run, + }) } { // setup listener and server @@ -245,6 +256,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten if err != nil { return nil, errs.Combine(err, peer.Close()) } + + peer.Servers.Add(lifecycle.Item{ + Name: "server", + Run: func(ctx context.Context) error { + peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) + peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) + peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) + return peer.Server.Run(ctx) + }, + Close: peer.Server.Close, + }) } { // setup trust pool @@ -252,6 +274,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten if err != nil { return nil, errs.Combine(err, peer.Close()) } + peer.Services.Add(lifecycle.Item{ + Name: "trust", + Run: peer.Storage2.Trust.Run, + }) } { @@ -289,10 +315,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten } peer.Contact.PingStats = new(contact.PingStats) peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self) + peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service) + peer.Services.Add(lifecycle.Item{ + Name: "contact:chore", + Run: peer.Contact.Chore.Run, + Close: peer.Contact.Chore.Close, + }) + peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats) pb.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint) pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint) + } { // setup storage @@ -312,6 +346,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.Trust, peer.Storage2.Store, ) + peer.Services.Add(lifecycle.Item{ + Name: "pieces:trash", + Run: peer.Storage2.TrashChore.Run, + Close: peer.Storage2.TrashChore.Close, + }) peer.Storage2.CacheService = pieces.NewService( log.Named("piecestore:cache"), @@ -319,6 +358,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.Store, config.Storage2.CacheSyncInterval, ) + peer.Services.Add(lifecycle.Item{ + Name: "piecestore:cache", + Run: peer.Storage2.CacheService.Run, + Close: peer.Storage2.CacheService.Close, + }) peer.Storage2.Monitor = monitor.NewService( log.Named("piecestore:monitor"), @@ -331,12 +375,22 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten config.Storage.KBucketRefreshInterval, config.Storage2.Monitor, ) + peer.Services.Add(lifecycle.Item{ + Name: "piecestore:monitor", + Run: peer.Storage2.Monitor.Run, + Close: peer.Storage2.Monitor.Close, + }) peer.Storage2.RetainService = retain.NewService( peer.Log.Named("retain"), peer.Storage2.Store, config.Retain, ) + peer.Services.Add(lifecycle.Item{ + Name: "retain", + Run: peer.Storage2.RetainService.Run, + Close: peer.Storage2.RetainService.Close, + }) peer.Storage2.Endpoint, err = piecestore.NewEndpoint( peer.Log.Named("piecestore"), @@ -375,13 +429,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.Trust, config.Storage2.Orders, ) + peer.Services.Add(lifecycle.Item{ + Name: "orders", + Run: peer.Storage2.Orders.Run, + Close: peer.Storage2.Orders.Close, + }) } { // setup node stats service peer.NodeStats.Service = nodestats.NewService( peer.Log.Named("nodestats:service"), peer.Dialer, - peer.Storage2.Trust) + peer.Storage2.Trust, + ) peer.NodeStats.Cache = nodestats.NewCache( peer.Log.Named("nodestats:cache"), @@ -391,7 +451,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten StorageUsage: peer.DB.StorageUsage(), }, peer.NodeStats.Service, - peer.Storage2.Trust) + peer.Storage2.Trust, + ) + + peer.Services.Add(lifecycle.Item{ + Name: "nodestats:cache", + Run: peer.NodeStats.Cache.Run, + Close: peer.NodeStats.Cache.Close, + }) } { // setup storage node operator dashboard @@ -408,8 +475,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.DB.Reputation(), peer.DB.StorageUsage(), peer.Contact.PingStats, - peer.Contact.Service) - + peer.Contact.Service, + ) if err != nil { return nil, errs.Combine(err, peer.Close()) } @@ -432,6 +499,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Console.Service, peer.Console.Listener, ) + peer.Services.Add(lifecycle.Item{ + Name: "console:endpoint", + Run: peer.Console.Endpoint.Run, + Close: peer.Console.Endpoint.Close, + }) } { // setup storage inspector @@ -467,11 +539,26 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Dialer, peer.DB.Satellites(), ) + peer.Services.Add(lifecycle.Item{ + Name: "gracefulexit:chore", + Run: peer.GracefulExit.Chore.Run, + Close: peer.GracefulExit.Chore.Close, + }) } peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector) + peer.Services.Add(lifecycle.Item{ + Name: "collector", + Run: peer.Collector.Run, + Close: peer.Collector.Close, + }) peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth) + peer.Services.Add(lifecycle.Item{ + Name: "bandwidth", + Run: peer.Bandwidth.Run, + Close: peer.Bandwidth.Close, + }) return peer, nil } @@ -492,112 +579,18 @@ func (peer *Peer) Run(ctx context.Context) (err error) { } group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Version.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Contact.Chore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Collector.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.Orders.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.CacheService.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.RetainService.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.TrashChore.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Bandwidth.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.Trust.Run(ctx)) - }) - - group.Go(func() error { - // TODO: move the message into Server instead - // Don't change the format of this comment, it is used to figure out the node id. - peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) - peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) - peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) - return errs2.IgnoreCanceled(peer.Server.Run(ctx)) - }) - - group.Go(func() error { - return errs2.IgnoreCanceled(peer.NodeStats.Cache.Run(ctx)) - }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx)) - }) - - group.Go(func() error { - return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx)) - }) + peer.Servers.Run(ctx, group) + peer.Services.Run(ctx, group) return group.Wait() } // Close closes all the resources. func (peer *Peer) Close() error { - var errlist errs.Group - - // TODO: ensure that Close can be called on nil-s that way this code won't need the checks. - - // close servers, to avoid new connections to closing subsystems - if peer.Server != nil { - errlist.Add(peer.Server.Close()) - } - - // close services in reverse initialization order - if peer.GracefulExit.Chore != nil { - errlist.Add(peer.GracefulExit.Chore.Close()) - } - if peer.Contact.Chore != nil { - errlist.Add(peer.Contact.Chore.Close()) - } - if peer.Bandwidth != nil { - errlist.Add(peer.Bandwidth.Close()) - } - if peer.Storage2.TrashChore != nil { - errlist.Add(peer.Storage2.TrashChore.Close()) - } - if peer.Storage2.RetainService != nil { - errlist.Add(peer.Storage2.RetainService.Close()) - } - if peer.Storage2.Monitor != nil { - errlist.Add(peer.Storage2.Monitor.Close()) - } - if peer.Storage2.Orders != nil { - errlist.Add(peer.Storage2.Orders.Close()) - } - if peer.Storage2.CacheService != nil { - errlist.Add(peer.Storage2.CacheService.Close()) - } - if peer.Collector != nil { - errlist.Add(peer.Collector.Close()) - } - - if peer.Console.Endpoint != nil { - errlist.Add(peer.Console.Endpoint.Close()) - } else if peer.Console.Listener != nil { - errlist.Add(peer.Console.Listener.Close()) - } - - if peer.NodeStats.Cache != nil { - errlist.Add(peer.NodeStats.Cache.Close()) - } - - return errlist.Err() + return errs.Combine( + peer.Servers.Close(), + peer.Services.Close(), + ) } // ID returns the peer ID.