diff --git a/satellite/accounting/rollup/rollup.go b/satellite/accounting/rollup/rollup.go index 2e5f87a3c..25596fa63 100644 --- a/satellite/accounting/rollup/rollup.go +++ b/satellite/accounting/rollup/rollup.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "storj.io/storj/internal/memory" + "storj.io/storj/internal/sync2" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" @@ -25,7 +26,7 @@ type Config struct { // Service is the rollup service for totalling data on storage nodes on daily intervals type Service struct { logger *zap.Logger - ticker *time.Ticker + Loop sync2.Cycle sdb accounting.StoragenodeAccounting deleteTallies bool } @@ -34,7 +35,7 @@ type Service struct { func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool) *Service { return &Service{ logger: logger, - ticker: time.NewTicker(interval), + Loop: *sync2.NewCycle(interval), sdb: sdb, deleteTallies: deleteTallies, } @@ -44,17 +45,19 @@ func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time func (r *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) r.logger.Info("Rollup service starting up") - for { - err = r.Rollup(ctx) + return r.Loop.Run(ctx, func(ctx context.Context) error { + err := r.Rollup(ctx) if err != nil { - r.logger.Error("Query failed", zap.Error(err)) + r.logger.Error("rollup failed", zap.Error(err)) } - select { - case <-r.ticker.C: // wait for the next interval to happen - case <-ctx.Done(): // or the Rollup is canceled via context - return ctx.Err() - } - } + return nil + }) +} + +// Close stops the service and releases any resources. +func (r *Service) Close() error { + r.Loop.Close() + return nil } // Rollup aggregates storage and bandwidth amounts for the time interval diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 3024d533e..a43d2bb27 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -12,6 +12,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/storj/internal/sync2" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" @@ -31,21 +32,19 @@ type Service struct { logger *zap.Logger metainfo *metainfo.Service overlay *overlay.Service - limit int - ticker *time.Ticker + Loop sync2.Cycle storagenodeAccountingDB accounting.StoragenodeAccounting projectAccountingDB accounting.ProjectAccounting liveAccounting live.Service } // New creates a new tally Service -func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Service, limit int, interval time.Duration) *Service { +func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Service, interval time.Duration) *Service { return &Service{ logger: logger, metainfo: metainfo, overlay: overlay, - limit: limit, - ticker: time.NewTicker(interval), + Loop: *sync2.NewCycle(interval), storagenodeAccountingDB: sdb, projectAccountingDB: pdb, liveAccounting: liveAccounting, @@ -57,16 +56,19 @@ func (t *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) t.logger.Info("Tally service starting up") - for { - if err = t.Tally(ctx); err != nil { - t.logger.Error("Tally failed", zap.Error(err)) + return t.Loop.Run(ctx, func(ctx context.Context) error { + err := t.Tally(ctx) + if err != nil { + t.logger.Error("tally failed", zap.Error(err)) } - select { - case <-t.ticker.C: // wait for the next interval to happen - case <-ctx.Done(): // or the Tally is canceled via context - return ctx.Err() - } - } + return nil + }) +} + +// Close stops the service and releases any resources. +func (t *Service) Close() error { + t.Loop.Close() + return nil } // Tally calculates data-at-rest usage once @@ -100,6 +102,7 @@ func (t *Service) Tally(ctx context.Context) (err error) { } } } + return errs.Combine(errAtRest, errBucketInfo) } diff --git a/satellite/peer.go b/satellite/peer.go index 3f7129029..e9c5fa638 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -539,7 +539,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten { // setup accounting log.Debug("Setting up accounting") - peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval) + peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, config.Tally.Interval) peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) } @@ -726,6 +726,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx)) }) + group.Go(func() error { + return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx)) + }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx)) }) @@ -758,9 +761,6 @@ func (peer *Peer) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx)) }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx)) - }) return group.Wait() } @@ -792,6 +792,8 @@ func (peer *Peer) Close() error { errlist.Add(peer.Marketing.Listener.Close()) } + // close services in reverse initialization order + if peer.Audit.Chore != nil { errlist.Add(peer.Audit.Chore.Close()) } @@ -799,7 +801,13 @@ func (peer *Peer) Close() error { errlist.Add(peer.Audit.Worker.Close()) } - // close services in reverse initialization order + if peer.Accounting.Rollup != nil { + errlist.Add(peer.Accounting.Rollup.Close()) + } + if peer.Accounting.Tally != nil { + errlist.Add(peer.Accounting.Tally.Close()) + } + if peer.DBCleanup.Chore != nil { errlist.Add(peer.DBCleanup.Chore.Close()) }