2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-11-14 01:22:18 +00:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package rollup
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-01-16 19:30:33 +00:00
|
|
|
"fmt"
|
2018-11-14 01:22:18 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
2018-11-29 18:39:27 +00:00
|
|
|
|
2018-12-12 21:24:08 +00:00
|
|
|
"storj.io/storj/pkg/accounting"
|
2019-01-16 19:30:33 +00:00
|
|
|
"storj.io/storj/pkg/storj"
|
2018-11-14 01:22:18 +00:00
|
|
|
)
|
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// Config contains configurable values for rollup
|
|
|
|
type Config struct {
|
|
|
|
Interval time.Duration `help:"how frequently rollup should run" default:"120s"`
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// Rollup is the service for totalling data on storage nodes on daily intervals
|
|
|
|
type Rollup struct { // TODO: rename to service
|
2018-11-14 01:22:18 +00:00
|
|
|
logger *zap.Logger
|
|
|
|
ticker *time.Ticker
|
2018-12-14 14:27:21 +00:00
|
|
|
db accounting.DB
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// New creates a new rollup service
|
|
|
|
func New(logger *zap.Logger, db accounting.DB, interval time.Duration) *Rollup {
|
|
|
|
return &Rollup{
|
2018-11-14 01:22:18 +00:00
|
|
|
logger: logger,
|
|
|
|
ticker: time.NewTicker(interval),
|
2018-11-26 21:49:55 +00:00
|
|
|
db: db,
|
2018-12-14 14:27:21 +00:00
|
|
|
}
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// Run the Rollup loop
|
|
|
|
func (r *Rollup) Run(ctx context.Context) (err error) {
|
2019-01-24 20:05:53 +00:00
|
|
|
r.logger.Info("Rollup service starting up")
|
2018-11-14 01:22:18 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
for {
|
|
|
|
err = r.Query(ctx)
|
|
|
|
if err != nil {
|
2018-12-12 21:24:08 +00:00
|
|
|
r.logger.Error("Query failed", zap.Error(err))
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-r.ticker.C: // wait for the next interval to happen
|
2019-01-23 19:58:44 +00:00
|
|
|
case <-ctx.Done(): // or the Rollup is canceled via context
|
2018-11-14 01:22:18 +00:00
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// Query rolls up raw tally
|
|
|
|
func (r *Rollup) Query(ctx context.Context) error {
|
|
|
|
// only Rollup new things - get LastRollup
|
2019-01-16 19:30:33 +00:00
|
|
|
var latestTally time.Time
|
|
|
|
lastRollup, isNil, err := r.db.LastRawTime(ctx, accounting.LastRollup)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
var tallies []*accounting.Raw
|
|
|
|
if isNil {
|
|
|
|
r.logger.Info("Rollup found no existing raw tally data")
|
|
|
|
tallies, err = r.db.GetRaw(ctx)
|
|
|
|
} else {
|
|
|
|
tallies, err = r.db.GetRawSince(ctx, lastRollup)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
if len(tallies) == 0 {
|
|
|
|
r.logger.Info("Rollup found no new tallies")
|
|
|
|
return nil
|
|
|
|
}
|
2019-01-23 19:58:44 +00:00
|
|
|
//loop through tallies and build Rollup
|
2019-01-16 19:30:33 +00:00
|
|
|
rollupStats := make(accounting.RollupStats)
|
|
|
|
for _, tallyRow := range tallies {
|
|
|
|
node := tallyRow.NodeID
|
|
|
|
if tallyRow.CreatedAt.After(latestTally) {
|
|
|
|
latestTally = tallyRow.CreatedAt
|
|
|
|
}
|
|
|
|
//create or get AccoutingRollup
|
|
|
|
iDay := tallyRow.IntervalEndTime
|
|
|
|
iDay = time.Date(iDay.Year(), iDay.Month(), iDay.Day(), 0, 0, 0, 0, iDay.Location())
|
|
|
|
if rollupStats[iDay] == nil {
|
|
|
|
rollupStats[iDay] = make(map[storj.NodeID]*accounting.Rollup)
|
|
|
|
}
|
|
|
|
if rollupStats[iDay][node] == nil {
|
|
|
|
rollupStats[iDay][node] = &accounting.Rollup{NodeID: node, StartTime: iDay}
|
|
|
|
}
|
|
|
|
//increment Rollups
|
|
|
|
switch tallyRow.DataType {
|
|
|
|
case accounting.BandwidthPut:
|
|
|
|
rollupStats[iDay][node].PutTotal += int64(tallyRow.DataTotal)
|
|
|
|
case accounting.BandwidthGet:
|
|
|
|
rollupStats[iDay][node].GetTotal += int64(tallyRow.DataTotal)
|
|
|
|
case accounting.BandwidthGetAudit:
|
|
|
|
rollupStats[iDay][node].GetAuditTotal += int64(tallyRow.DataTotal)
|
|
|
|
case accounting.BandwidthGetRepair:
|
|
|
|
rollupStats[iDay][node].GetRepairTotal += int64(tallyRow.DataTotal)
|
|
|
|
case accounting.BandwidthPutRepair:
|
|
|
|
rollupStats[iDay][node].PutRepairTotal += int64(tallyRow.DataTotal)
|
|
|
|
case accounting.AtRest:
|
|
|
|
rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal
|
|
|
|
default:
|
2019-01-23 19:58:44 +00:00
|
|
|
return Error.Wrap(fmt.Errorf("Bad tally datatype in Rollup : %d", tallyRow.DataType))
|
2019-01-16 19:30:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
//remove the latest day (which we cannot know is complete), then push to DB
|
|
|
|
latestTally = time.Date(latestTally.Year(), latestTally.Month(), latestTally.Day(), 0, 0, 0, 0, latestTally.Location())
|
|
|
|
delete(rollupStats, latestTally)
|
2019-01-18 16:53:23 +00:00
|
|
|
if len(rollupStats) == 0 {
|
|
|
|
r.logger.Info("Rollup only found tallies for today")
|
|
|
|
return nil
|
|
|
|
}
|
2019-01-24 20:41:22 +00:00
|
|
|
return Error.Wrap(r.db.SaveRollup(ctx, latestTally, isNil, rollupStats))
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|