private/lifecycle: implement Group

lifecycle.Group implements controlling multiple items such
that their startup and close works.

Change-Id: Idb4f4a6c3a1f07cdcf44d3147a6c959686df0007
This commit is contained in:
Egon Elbre 2020-01-28 18:13:59 -05:00
parent a1948ed338
commit e319660f7a
6 changed files with 477 additions and 330 deletions

View File

@ -0,0 +1,76 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
// Package lifecycle allows controlling group of items.
package lifecycle
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/common/errs2"
)
var mon = monkit.Package()
// Group implements a collection of items that have a
// concurrent start and are closed in reverse order.
type Group struct {
log *zap.Logger
items []Item
}
// Item is the lifecycle item that group runs and closes.
type Item struct {
Name string
Run func(ctx context.Context) error
Close func() error
}
// NewGroup creates a new group.
func NewGroup(log *zap.Logger) *Group {
return &Group{log: log}
}
// Add adds item to the group.
func (group *Group) Add(item Item) {
group.items = append(group.items, item)
}
// Run starts all items concurrently under group g.
func (group *Group) Run(ctx context.Context, g *errgroup.Group) {
defer mon.Task()(&ctx)(nil)
var started []string
for _, item := range group.items {
item := item
started = append(started, item.Name)
if item.Run == nil {
continue
}
g.Go(func() error {
return errs2.IgnoreCanceled(item.Run(ctx))
})
}
group.log.Debug("started", zap.Strings("items", started))
}
// Close closes all items in reverse order.
func (group *Group) Close() error {
var errlist errs.Group
for i := len(group.items) - 1; i >= 0; i-- {
item := group.items[i]
if item.Close == nil {
continue
}
errlist.Add(item.Close())
}
return errlist.Err()
}

View File

@ -0,0 +1,71 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package lifecycle_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"storj.io/common/testcontext"
"storj.io/storj/private/lifecycle"
)
func TestGroup(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
closed := []string{}
var astart, cstart bool
group := lifecycle.NewGroup(log)
group.Add(lifecycle.Item{
Name: "A",
Run: func(ctx context.Context) error {
astart = true
log.Info("Run A")
return nil
},
Close: func() error {
closed = append(closed, "A")
return nil
},
})
group.Add(lifecycle.Item{
Name: "B",
Run: nil,
Close: func() error {
closed = append(closed, "B")
return nil
},
})
group.Add(lifecycle.Item{
Name: "C",
Run: func(ctx context.Context) error {
cstart = true
log.Info("Run C")
return nil
},
Close: nil,
})
g, gctx := errgroup.WithContext(ctx)
group.Run(gctx, g)
err := g.Wait()
require.NoError(t, err)
require.True(t, astart)
require.True(t, cstart)
err = group.Close()
require.NoError(t, err)
require.Equal(t, []string{"B", "A"}, closed)
}

View File

@ -13,7 +13,6 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/peertls/extensions"
@ -23,6 +22,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/server"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/post"
"storj.io/storj/private/post/oauth2"
"storj.io/storj/private/version"
@ -61,6 +61,9 @@ type API struct {
Identity *identity.FullIdentity
DB DB
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Server *server.Server
Version *checker.Service
@ -154,6 +157,9 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
Log: log,
Identity: full,
DB: db,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
var err error
@ -164,6 +170,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Run,
})
}
{ // setup listener and server
@ -184,11 +195,29 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Servers.Add(lifecycle.Item{
Name: "server",
Run: func(ctx context.Context) error {
// Don't change the format of this comment, it is used to figure out the node id.
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return peer.Server.Run(ctx)
},
Close: peer.Server.Close,
})
}
{ // setup overlay
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Service.Close,
})
peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service)
pb.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector)
pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector)
@ -219,6 +248,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint)
peer.Services.Add(lifecycle.Item{
Name: "contact:service",
Close: peer.Contact.Service.Close,
})
}
{ // setup vouchers
@ -240,7 +274,13 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),
@ -291,6 +331,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Servers.Add(lifecycle.Item{
Name: "marketing:endpoint",
Run: peer.Marketing.Endpoint.Run,
Close: peer.Marketing.Endpoint.Close,
})
}
{ // setup metainfo
@ -301,13 +347,17 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
)
peer.Metainfo.DeletePiecesService, err = metainfo.NewDeletePiecesService(
peer.Log.Named("metainfo:DeletePiecesService"),
peer.Log.Named("metainfo:delete-pieces"),
peer.Dialer,
metainfoDeletePiecesConcurrencyLimit,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "metainfo:delete-pieces",
Close: peer.Metainfo.DeletePiecesService.Close,
})
peer.Metainfo.Endpoint2 = metainfo.NewEndpoint(
peer.Log.Named("metainfo:endpoint"),
@ -328,6 +378,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
)
pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2)
pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2)
peer.Services.Add(lifecycle.Item{
Name: "metainfo:endpoint",
Close: peer.Metainfo.Endpoint2.Close,
})
}
{ // setup datarepair
@ -410,6 +465,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "mail:service",
Close: peer.Mail.Service.Close,
})
}
{ // setup payments
@ -433,12 +493,18 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Payments.Inspector = stripecoinpayments.NewEndpoint(service)
peer.Payments.Version = stripecoinpayments.NewVersionService(
peer.Log.Named("payments.stripe:service"),
peer.Log.Named("payments.stripe:version"),
service,
pc.StripeCoinPayments.ConversionRatesCycleInterval)
pb.RegisterPaymentsServer(peer.Server.PrivateGRPC(), peer.Payments.Inspector)
pb.DRPCRegisterPayments(peer.Server.PrivateDRPC(), peer.Payments.Inspector)
peer.Services.Add(lifecycle.Item{
Name: "payments.stripe:version",
Run: peer.Payments.Version.Run,
Close: peer.Payments.Version.Close,
})
}
}
@ -485,6 +551,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Console.Listener,
config.Payments.StripeCoinPayments.StripePublicKey,
)
peer.Servers.Add(lifecycle.Item{
Name: "console:endpoint",
Run: peer.Console.Endpoint.Run,
Close: peer.Console.Endpoint.Close,
})
}
{ // setup node stats endpoint
@ -526,74 +598,18 @@ func (peer *API) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
// Don't change the format of this comment, it is used to figure out the node id.
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return errs2.IgnoreCanceled(peer.Server.Run(ctx))
})
if peer.Payments.Version != nil {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Payments.Version.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *API) Close() error {
var errlist errs.Group
// close servers, to avoid new connections to closing subsystems
if peer.Server != nil {
errlist.Add(peer.Server.Close())
}
if peer.Marketing.Endpoint != nil {
errlist.Add(peer.Marketing.Endpoint.Close())
} else if peer.Marketing.Listener != nil {
errlist.Add(peer.Marketing.Listener.Close())
}
if peer.Console.Endpoint != nil {
errlist.Add(peer.Console.Endpoint.Close())
} else if peer.Console.Listener != nil {
errlist.Add(peer.Console.Listener.Close())
}
if peer.Mail.Service != nil {
errlist.Add(peer.Mail.Service.Close())
}
if peer.Payments.Version != nil {
errlist.Add(peer.Payments.Version.Close())
}
if peer.Metainfo.Endpoint2 != nil {
errlist.Add(peer.Metainfo.Endpoint2.Close())
}
if peer.Contact.Service != nil {
errlist.Add(peer.Contact.Service.Close())
}
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
if peer.Orders.Chore.Loop != nil {
errlist.Add(peer.Orders.Chore.Close())
}
if peer.Metainfo.DeletePiecesService != nil {
errlist.Add(peer.Metainfo.DeletePiecesService.Close())
}
return errlist.Err()
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.

View File

@ -10,7 +10,6 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/peertls/extensions"
@ -18,6 +17,7 @@ import (
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/version"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting"
@ -50,6 +50,9 @@ type Core struct {
Identity *identity.FullIdentity
DB DB
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version *version_checker.Service
@ -136,6 +139,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Log: log,
Identity: full,
DB: db,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
var err error
@ -146,6 +152,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Run,
})
}
{ // setup listener and server
@ -177,11 +188,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Version: *pbVersion,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer)
peer.Services.Add(lifecycle.Item{
Name: "contact:service",
Close: peer.Contact.Service.Close,
})
}
{ // setup overlay
peer.Overlay.DB = overlay.NewCombinedCache(peer.DB.OverlayCache())
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Service.Close,
})
}
{ // setup live accounting
@ -198,7 +217,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
@ -221,21 +245,31 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DB.Buckets(),
)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
peer.Services.Add(lifecycle.Item{
Name: "metainfo:loop",
Run: peer.Metainfo.Loop.Run,
Close: peer.Metainfo.Loop.Close,
})
}
{ // setup datarepair
// TODO: simplify argument list somehow
peer.Repair.Checker = checker.NewChecker(
peer.Log.Named("checker"),
peer.Log.Named("repair:checker"),
peer.DB.RepairQueue(),
peer.DB.Irreparable(),
peer.Metainfo.Service,
peer.Metainfo.Loop,
peer.Overlay.Service,
config.Checker)
peer.Services.Add(lifecycle.Item{
Name: "repair:checker",
Run: peer.Repair.Checker.Run,
Close: peer.Repair.Checker.Close,
})
segmentRepairer := repairer.NewSegmentRepairer(
log.Named("repairer"),
log.Named("segment-repair"),
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
@ -248,11 +282,16 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
)
peer.Repair.Repairer = repairer.NewService(
peer.Log.Named("repairer"),
peer.Log.Named("repair"),
peer.DB.RepairQueue(),
&config.Repairer,
segmentRepairer,
)
peer.Services.Add(lifecycle.Item{
Name: "repair",
Run: peer.Repair.Repairer.Run,
Close: peer.Repair.Repairer.Close,
})
}
{ // setup audit
@ -278,42 +317,78 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit worker"),
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.Queue,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:worker",
Run: peer.Audit.Worker.Run,
Close: peer.Audit.Worker.Close,
})
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit chore"),
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.Queue,
peer.Metainfo.Loop,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:chore",
Run: peer.Audit.Chore.Run,
Close: peer.Audit.Chore.Close,
})
}
{ // setup garbage collection
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage collection"),
peer.Log.Named("garbage-collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "garbage-collection",
Run: peer.GarbageCollection.Service.Run,
})
}
{ // setup db cleanup
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
peer.Services.Add(lifecycle.Item{
Name: "dbcleanup",
Run: peer.DBCleanup.Chore.Run,
Close: peer.DBCleanup.Chore.Close,
})
}
{ // setup accounting
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
peer.Accounting.ReportedRollupChore = reportedrollup.NewChore(peer.Log.Named("reportedrollup"), peer.DB.Orders(), config.ReportedRollup)
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
peer.Services.Add(lifecycle.Item{
Name: "accounting:tally",
Run: peer.Accounting.Tally.Run,
Close: peer.Accounting.Tally.Close,
})
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
peer.Services.Add(lifecycle.Item{
Name: "accounting:rollup",
Run: peer.Accounting.Rollup.Run,
Close: peer.Accounting.Rollup.Close,
})
peer.Accounting.ReportedRollupChore = reportedrollup.NewChore(peer.Log.Named("accounting:reported-rollup"), peer.DB.Orders(), config.ReportedRollup)
peer.Services.Add(lifecycle.Item{
Name: "accounting:reported-rollup",
Run: peer.Accounting.ReportedRollupChore.Run,
Close: peer.Accounting.ReportedRollupChore.Close,
})
}
// TODO: remove in future, should be in API
@ -342,12 +417,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
pc.StripeCoinPayments.TransactionUpdateInterval,
pc.StripeCoinPayments.AccountBalanceUpdateInterval,
)
peer.Services.Add(lifecycle.Item{
Name: "payments.stripe:service",
Run: peer.Payments.Chore.Run,
})
}
}
{ // setup graceful exit
if config.GracefulExit.Enabled {
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
peer.Services.Add(lifecycle.Item{
Name: "gracefulexit",
Run: peer.GracefulExit.Chore.Run,
Close: peer.GracefulExit.Chore.Close,
})
} else {
peer.Log.Named("gracefulexit").Info("disabled")
}
@ -359,6 +443,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
config.Metrics,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "metrics",
Run: peer.Metrics.Chore.Run,
Close: peer.Metrics.Chore.Close,
})
}
{ // setup downtime tracking
@ -371,6 +460,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DowntimeTracking.Service,
peer.DB.DowntimeTracking(),
)
peer.Services.Add(lifecycle.Item{
Name: "downtime:detection",
Run: peer.DowntimeTracking.DetectionChore.Run,
Close: peer.DowntimeTracking.DetectionChore.Close,
})
peer.DowntimeTracking.EstimationChore = downtime.NewEstimationChore(
peer.Log.Named("downtime:estimation"),
@ -379,6 +473,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DowntimeTracking.Service,
peer.DB.DowntimeTracking(),
)
peer.Services.Add(lifecycle.Item{
Name: "downtime:estimation",
Run: peer.DowntimeTracking.EstimationChore.Run,
Close: peer.DowntimeTracking.EstimationChore.Close,
})
}
return peer, nil
@ -390,133 +489,18 @@ func (peer *Core) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metainfo.Loop.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.ReportedRollupChore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Worker.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Audit.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
})
if peer.GracefulExit.Chore != nil {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx))
})
if peer.Payments.Chore != nil {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Payments.Chore.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DowntimeTracking.DetectionChore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DowntimeTracking.EstimationChore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *Core) Close() error {
var errlist errs.Group
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
if peer.Orders.Chore != nil {
errlist.Add(peer.Orders.Chore.Close())
}
// close servers, to avoid new connections to closing subsystems
if peer.DowntimeTracking.EstimationChore != nil {
errlist.Add(peer.DowntimeTracking.EstimationChore.Close())
}
if peer.DowntimeTracking.DetectionChore != nil {
errlist.Add(peer.DowntimeTracking.DetectionChore.Close())
}
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}
if peer.GracefulExit.Chore != nil {
errlist.Add(peer.GracefulExit.Chore.Close())
}
// close services in reverse initialization order
if peer.Audit.Chore != nil {
errlist.Add(peer.Audit.Chore.Close())
}
if peer.Audit.Worker != nil {
errlist.Add(peer.Audit.Worker.Close())
}
if peer.Accounting.Rollup != nil {
errlist.Add(peer.Accounting.ReportedRollupChore.Close())
}
if peer.Accounting.Rollup != nil {
errlist.Add(peer.Accounting.Rollup.Close())
}
if peer.Accounting.Tally != nil {
errlist.Add(peer.Accounting.Tally.Close())
}
if peer.DBCleanup.Chore != nil {
errlist.Add(peer.DBCleanup.Chore.Close())
}
if peer.Repair.Repairer != nil {
errlist.Add(peer.Repair.Repairer.Close())
}
if peer.Repair.Checker != nil {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
if peer.Contact.Service != nil {
errlist.Add(peer.Contact.Service.Close())
}
if peer.Metainfo.Loop != nil {
errlist.Add(peer.Metainfo.Loop.Close())
}
return errlist.Err()
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.

View File

@ -10,7 +10,6 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/peertls/extensions"
@ -18,6 +17,7 @@ import (
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/version"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/metainfo"
@ -34,6 +34,9 @@ type Repairer struct {
Log *zap.Logger
Identity *identity.FullIdentity
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version *version_checker.Service
@ -58,6 +61,9 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
peer := &Repairer{
Log: log,
Identity: full,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
{
@ -66,6 +72,11 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Run,
})
}
{ // setup dialer
@ -85,11 +96,20 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
{ // setup overlay
peer.Overlay = overlay.NewService(log.Named("overlay"), overlayCache, config.Overlay)
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Close,
})
}
{ // setup orders
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
peer.Services.Add(lifecycle.Item{
Name: "orders:chore",
Run: peer.Orders.Chore.Run,
Close: peer.Orders.Chore.Close,
})
peer.Orders.Service = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
@ -119,6 +139,12 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
)
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer)
peer.Services.Add(lifecycle.Item{
Name: "repair",
Run: peer.Repairer.Run,
Close: peer.Repairer.Close,
})
}
return peer, nil
@ -130,37 +156,18 @@ func (peer *Repairer) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
group.Go(func() error {
peer.Log.Info("Repairer started")
return errs2.IgnoreCanceled(peer.Repairer.Run(ctx))
})
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *Repairer) Close() error {
var errlist errs.Group
// close services in reverse initialization order
if peer.Overlay != nil {
errlist.Add(peer.Overlay.Close())
}
if peer.Repairer != nil {
errlist.Add(peer.Repairer.Close())
}
if peer.Orders.Chore != nil {
errlist.Add(peer.Orders.Chore.Close())
}
return errlist.Err()
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.

View File

@ -14,7 +14,6 @@ import (
"golang.org/x/sync/errgroup"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/peertls/extensions"
@ -23,6 +22,7 @@ import (
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/pkg/server"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/version"
"storj.io/storj/private/version/checker"
"storj.io/storj/satellite/overlay"
@ -153,6 +153,9 @@ type Peer struct {
Identity *identity.FullIdentity
DB DB
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Server *server.Server
@ -219,6 +222,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
Log: log,
Identity: full,
DB: db,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
var err error
@ -229,6 +235,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = checker.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode")
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Run,
})
}
{ // setup listener and server
@ -245,6 +256,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Servers.Add(lifecycle.Item{
Name: "server",
Run: func(ctx context.Context) error {
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return peer.Server.Run(ctx)
},
Close: peer.Server.Close,
})
}
{ // setup trust pool
@ -252,6 +274,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "trust",
Run: peer.Storage2.Trust.Run,
})
}
{
@ -289,10 +315,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}
peer.Contact.PingStats = new(contact.PingStats)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self)
peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service)
peer.Services.Add(lifecycle.Item{
Name: "contact:chore",
Run: peer.Contact.Chore.Run,
Close: peer.Contact.Chore.Close,
})
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats)
pb.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint)
}
{ // setup storage
@ -312,6 +346,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.Trust,
peer.Storage2.Store,
)
peer.Services.Add(lifecycle.Item{
Name: "pieces:trash",
Run: peer.Storage2.TrashChore.Run,
Close: peer.Storage2.TrashChore.Close,
})
peer.Storage2.CacheService = pieces.NewService(
log.Named("piecestore:cache"),
@ -319,6 +358,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.Store,
config.Storage2.CacheSyncInterval,
)
peer.Services.Add(lifecycle.Item{
Name: "piecestore:cache",
Run: peer.Storage2.CacheService.Run,
Close: peer.Storage2.CacheService.Close,
})
peer.Storage2.Monitor = monitor.NewService(
log.Named("piecestore:monitor"),
@ -331,12 +375,22 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
config.Storage.KBucketRefreshInterval,
config.Storage2.Monitor,
)
peer.Services.Add(lifecycle.Item{
Name: "piecestore:monitor",
Run: peer.Storage2.Monitor.Run,
Close: peer.Storage2.Monitor.Close,
})
peer.Storage2.RetainService = retain.NewService(
peer.Log.Named("retain"),
peer.Storage2.Store,
config.Retain,
)
peer.Services.Add(lifecycle.Item{
Name: "retain",
Run: peer.Storage2.RetainService.Run,
Close: peer.Storage2.RetainService.Close,
})
peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
peer.Log.Named("piecestore"),
@ -375,13 +429,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.Trust,
config.Storage2.Orders,
)
peer.Services.Add(lifecycle.Item{
Name: "orders",
Run: peer.Storage2.Orders.Run,
Close: peer.Storage2.Orders.Close,
})
}
{ // setup node stats service
peer.NodeStats.Service = nodestats.NewService(
peer.Log.Named("nodestats:service"),
peer.Dialer,
peer.Storage2.Trust)
peer.Storage2.Trust,
)
peer.NodeStats.Cache = nodestats.NewCache(
peer.Log.Named("nodestats:cache"),
@ -391,7 +451,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
StorageUsage: peer.DB.StorageUsage(),
},
peer.NodeStats.Service,
peer.Storage2.Trust)
peer.Storage2.Trust,
)
peer.Services.Add(lifecycle.Item{
Name: "nodestats:cache",
Run: peer.NodeStats.Cache.Run,
Close: peer.NodeStats.Cache.Close,
})
}
{ // setup storage node operator dashboard
@ -408,8 +475,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.DB.Reputation(),
peer.DB.StorageUsage(),
peer.Contact.PingStats,
peer.Contact.Service)
peer.Contact.Service,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -432,6 +499,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Console.Service,
peer.Console.Listener,
)
peer.Services.Add(lifecycle.Item{
Name: "console:endpoint",
Run: peer.Console.Endpoint.Run,
Close: peer.Console.Endpoint.Close,
})
}
{ // setup storage inspector
@ -467,11 +539,26 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Dialer,
peer.DB.Satellites(),
)
peer.Services.Add(lifecycle.Item{
Name: "gracefulexit:chore",
Run: peer.GracefulExit.Chore.Run,
Close: peer.GracefulExit.Chore.Close,
})
}
peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector)
peer.Services.Add(lifecycle.Item{
Name: "collector",
Run: peer.Collector.Run,
Close: peer.Collector.Close,
})
peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth)
peer.Services.Add(lifecycle.Item{
Name: "bandwidth",
Run: peer.Bandwidth.Run,
Close: peer.Bandwidth.Close,
})
return peer, nil
}
@ -492,112 +579,18 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
}
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Contact.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Collector.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Orders.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.CacheService.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.RetainService.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.TrashChore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Bandwidth.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Trust.Run(ctx))
})
group.Go(func() error {
// TODO: move the message into Server instead
// Don't change the format of this comment, it is used to figure out the node id.
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return errs2.IgnoreCanceled(peer.Server.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.NodeStats.Cache.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *Peer) Close() error {
var errlist errs.Group
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
// close servers, to avoid new connections to closing subsystems
if peer.Server != nil {
errlist.Add(peer.Server.Close())
}
// close services in reverse initialization order
if peer.GracefulExit.Chore != nil {
errlist.Add(peer.GracefulExit.Chore.Close())
}
if peer.Contact.Chore != nil {
errlist.Add(peer.Contact.Chore.Close())
}
if peer.Bandwidth != nil {
errlist.Add(peer.Bandwidth.Close())
}
if peer.Storage2.TrashChore != nil {
errlist.Add(peer.Storage2.TrashChore.Close())
}
if peer.Storage2.RetainService != nil {
errlist.Add(peer.Storage2.RetainService.Close())
}
if peer.Storage2.Monitor != nil {
errlist.Add(peer.Storage2.Monitor.Close())
}
if peer.Storage2.Orders != nil {
errlist.Add(peer.Storage2.Orders.Close())
}
if peer.Storage2.CacheService != nil {
errlist.Add(peer.Storage2.CacheService.Close())
}
if peer.Collector != nil {
errlist.Add(peer.Collector.Close())
}
if peer.Console.Endpoint != nil {
errlist.Add(peer.Console.Endpoint.Close())
} else if peer.Console.Listener != nil {
errlist.Add(peer.Console.Listener.Close())
}
if peer.NodeStats.Cache != nil {
errlist.Add(peer.NodeStats.Cache.Close())
}
return errlist.Err()
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.