2018-11-14 01:22:18 +00:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package rollup
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
2018-11-29 18:39:27 +00:00
|
|
|
|
2018-11-26 21:49:55 +00:00
|
|
|
dbx "storj.io/storj/pkg/accounting/dbx"
|
2018-11-14 01:22:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Rollup is the service for totalling data on storage nodes for 1, 7, 30 day intervals
|
|
|
|
type Rollup interface {
|
|
|
|
Run(ctx context.Context) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type rollup struct {
|
|
|
|
logger *zap.Logger
|
|
|
|
ticker *time.Ticker
|
2018-11-26 21:49:55 +00:00
|
|
|
db *dbx.DB
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2018-11-26 21:49:55 +00:00
|
|
|
func newRollup(logger *zap.Logger, db *dbx.DB, interval time.Duration) (*rollup, error) {
|
2018-11-14 01:22:18 +00:00
|
|
|
return &rollup{
|
|
|
|
logger: logger,
|
|
|
|
ticker: time.NewTicker(interval),
|
2018-11-26 21:49:55 +00:00
|
|
|
db: db,
|
|
|
|
}, nil
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Run the rollup loop
|
|
|
|
func (r *rollup) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
for {
|
|
|
|
err = r.Query(ctx)
|
|
|
|
if err != nil {
|
|
|
|
zap.L().Error("Rollup Query failed", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-r.ticker.C: // wait for the next interval to happen
|
|
|
|
case <-ctx.Done(): // or the rollup is canceled via context
|
2018-11-26 21:49:55 +00:00
|
|
|
_ = r.db.Close()
|
2018-11-14 01:22:18 +00:00
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *rollup) Query(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|