satellite/accounting: use sync2.Cycle instead of ticker (#2977)

This commit is contained in:
Egon Elbre 2019-09-09 19:48:24 +03:00 committed by GitHub
parent 3b72cb6720
commit 646f290ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 30 deletions

View File

@ -10,6 +10,7 @@ import (
"go.uber.org/zap"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/accounting"
@ -25,7 +26,7 @@ type Config struct {
// Service is the rollup service for totalling data on storage nodes on daily intervals
type Service struct {
logger *zap.Logger
ticker *time.Ticker
Loop sync2.Cycle
sdb accounting.StoragenodeAccounting
deleteTallies bool
}
@ -34,7 +35,7 @@ type Service struct {
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool) *Service {
return &Service{
logger: logger,
ticker: time.NewTicker(interval),
Loop: *sync2.NewCycle(interval),
sdb: sdb,
deleteTallies: deleteTallies,
}
@ -44,17 +45,19 @@ func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time
func (r *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
r.logger.Info("Rollup service starting up")
for {
err = r.Rollup(ctx)
return r.Loop.Run(ctx, func(ctx context.Context) error {
err := r.Rollup(ctx)
if err != nil {
r.logger.Error("Query failed", zap.Error(err))
r.logger.Error("rollup failed", zap.Error(err))
}
select {
case <-r.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the Rollup is canceled via context
return ctx.Err()
}
}
return nil
})
}
// Close stops the service and releases any resources.
func (r *Service) Close() error {
r.Loop.Close()
return nil
}
// Rollup aggregates storage and bandwidth amounts for the time interval

View File

@ -12,6 +12,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/accounting"
@ -31,21 +32,19 @@ type Service struct {
logger *zap.Logger
metainfo *metainfo.Service
overlay *overlay.Service
limit int
ticker *time.Ticker
Loop sync2.Cycle
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
liveAccounting live.Service
}
// New creates a new tally Service
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Service, limit int, interval time.Duration) *Service {
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Service, interval time.Duration) *Service {
return &Service{
logger: logger,
metainfo: metainfo,
overlay: overlay,
limit: limit,
ticker: time.NewTicker(interval),
Loop: *sync2.NewCycle(interval),
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
liveAccounting: liveAccounting,
@ -57,16 +56,19 @@ func (t *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
t.logger.Info("Tally service starting up")
for {
if err = t.Tally(ctx); err != nil {
t.logger.Error("Tally failed", zap.Error(err))
return t.Loop.Run(ctx, func(ctx context.Context) error {
err := t.Tally(ctx)
if err != nil {
t.logger.Error("tally failed", zap.Error(err))
}
select {
case <-t.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the Tally is canceled via context
return ctx.Err()
}
}
return nil
})
}
// Close stops the service and releases any resources.
func (t *Service) Close() error {
t.Loop.Close()
return nil
}
// Tally calculates data-at-rest usage once
@ -100,6 +102,7 @@ func (t *Service) Tally(ctx context.Context) (err error) {
}
}
}
return errs.Combine(errAtRest, errBucketInfo)
}

View File

@ -539,7 +539,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
{ // setup accounting
log.Debug("Setting up accounting")
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval)
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
}
@ -726,6 +726,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
})
@ -758,9 +761,6 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
return group.Wait()
}
@ -792,6 +792,8 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Marketing.Listener.Close())
}
// close services in reverse initialization order
if peer.Audit.Chore != nil {
errlist.Add(peer.Audit.Chore.Close())
}
@ -799,7 +801,13 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Audit.Worker.Close())
}
// close services in reverse initialization order
if peer.Accounting.Rollup != nil {
errlist.Add(peer.Accounting.Rollup.Close())
}
if peer.Accounting.Tally != nil {
errlist.Add(peer.Accounting.Tally.Close())
}
if peer.DBCleanup.Chore != nil {
errlist.Add(peer.DBCleanup.Chore.Close())
}