2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-11-14 01:22:18 +00:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package rollup
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
2018-11-29 18:39:27 +00:00
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/common/sync2"
|
2019-07-28 06:55:36 +01:00
|
|
|
"storj.io/storj/satellite/accounting"
|
2018-11-14 01:22:18 +00:00
|
|
|
)
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// Config contains configurable values for rollup.
|
2019-01-23 19:58:44 +00:00
|
|
|
type Config struct {
|
2020-07-16 16:40:29 +01:00
|
|
|
Interval time.Duration `help:"how frequently rollup should run" releaseDefault:"24h" devDefault:"120s"`
|
|
|
|
DeleteTallies bool `help:"option for deleting tallies after they are rolled up" default:"true"`
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
// Service is the rollup service for totalling data on storage nodes on daily intervals.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Chore
|
2019-04-04 16:20:59 +01:00
|
|
|
type Service struct {
|
2020-11-12 19:01:55 +00:00
|
|
|
logger *zap.Logger
|
|
|
|
Loop *sync2.Cycle
|
|
|
|
sdb accounting.StoragenodeAccounting
|
|
|
|
deleteTallies bool
|
|
|
|
OrderExpiration time.Duration
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// New creates a new rollup service.
|
2020-11-12 19:01:55 +00:00
|
|
|
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool, orderExpiration time.Duration) *Service {
|
2019-04-04 16:20:59 +01:00
|
|
|
return &Service{
|
2020-11-12 19:01:55 +00:00
|
|
|
logger: logger,
|
|
|
|
Loop: sync2.NewCycle(interval),
|
|
|
|
sdb: sdb,
|
|
|
|
deleteTallies: deleteTallies,
|
|
|
|
OrderExpiration: orderExpiration,
|
2018-12-14 14:27:21 +00:00
|
|
|
}
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// Run the Rollup loop.
|
2019-04-04 16:20:59 +01:00
|
|
|
func (r *Service) Run(ctx context.Context) (err error) {
|
2018-11-14 01:22:18 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-09-09 17:48:24 +01:00
|
|
|
return r.Loop.Run(ctx, func(ctx context.Context) error {
|
|
|
|
err := r.Rollup(ctx)
|
2018-11-14 01:22:18 +00:00
|
|
|
if err != nil {
|
2019-09-09 17:48:24 +01:00
|
|
|
r.logger.Error("rollup failed", zap.Error(err))
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
2019-09-09 17:48:24 +01:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close stops the service and releases any resources.
|
|
|
|
func (r *Service) Close() error {
|
|
|
|
r.Loop.Close()
|
|
|
|
return nil
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// Rollup aggregates storage and bandwidth amounts for the time interval.
|
2019-06-04 12:36:27 +01:00
|
|
|
func (r *Service) Rollup(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-01-23 19:58:44 +00:00
|
|
|
// only Rollup new things - get LastRollup
|
2019-05-10 20:05:42 +01:00
|
|
|
lastRollup, err := r.sdb.LastTimestamp(ctx, accounting.LastRollup)
|
2019-01-16 19:30:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2020-11-04 17:24:11 +00:00
|
|
|
// unexpired orders with created at times before the last rollup timestamp could still have been added later
|
|
|
|
if !lastRollup.IsZero() {
|
|
|
|
lastRollup = lastRollup.Add(-r.OrderExpiration)
|
|
|
|
}
|
|
|
|
|
2019-04-04 16:20:59 +01:00
|
|
|
rollupStats := make(accounting.RollupStats)
|
|
|
|
latestTally, err := r.RollupStorage(ctx, lastRollup, rollupStats)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
2019-04-04 16:20:59 +01:00
|
|
|
err = r.RollupBW(ctx, lastRollup, rollupStats)
|
2019-01-16 19:30:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
2020-10-13 13:47:55 +01:00
|
|
|
// remove the latest day (which we cannot know is complete), then push to DB
|
2019-04-23 20:21:30 +01:00
|
|
|
latestTally = time.Date(latestTally.Year(), latestTally.Month(), latestTally.Day(), 0, 0, 0, 0, latestTally.Location())
|
|
|
|
delete(rollupStats, latestTally)
|
2019-04-04 16:20:59 +01:00
|
|
|
if len(rollupStats) == 0 {
|
2019-04-23 20:21:30 +01:00
|
|
|
r.logger.Info("RollupStats is empty")
|
2019-04-04 16:20:59 +01:00
|
|
|
return nil
|
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
2019-05-10 20:05:42 +01:00
|
|
|
err = r.sdb.SaveRollup(ctx, latestTally, rollupStats)
|
2019-04-04 16:20:59 +01:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
|
|
|
if r.deleteTallies {
|
|
|
|
// Delete already rolled up tallies
|
2020-11-12 19:01:55 +00:00
|
|
|
latestTally = latestTally.Add(-r.OrderExpiration)
|
2019-05-10 20:05:42 +01:00
|
|
|
err = r.sdb.DeleteTalliesBefore(ctx, latestTally)
|
2019-04-23 20:21:30 +01:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2019-04-04 16:20:59 +01:00
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
2019-04-04 16:20:59 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// RollupStorage rolls up storage tally, modifies rollupStats map.
|
2019-04-04 16:20:59 +01:00
|
|
|
func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (latestTally time.Time, err error) {
|
2019-06-04 12:36:27 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-05-10 20:05:42 +01:00
|
|
|
tallies, err := r.sdb.GetTalliesSince(ctx, lastRollup)
|
2019-04-04 16:20:59 +01:00
|
|
|
if err != nil {
|
2019-12-18 18:14:54 +00:00
|
|
|
return lastRollup, Error.Wrap(err)
|
2019-04-04 16:20:59 +01:00
|
|
|
}
|
2019-01-16 19:30:33 +00:00
|
|
|
if len(tallies) == 0 {
|
|
|
|
r.logger.Info("Rollup found no new tallies")
|
2019-12-18 18:14:54 +00:00
|
|
|
return lastRollup, nil
|
2019-01-16 19:30:33 +00:00
|
|
|
}
|
2020-10-13 13:47:55 +01:00
|
|
|
// loop through tallies and build Rollup
|
2019-01-16 19:30:33 +00:00
|
|
|
for _, tallyRow := range tallies {
|
|
|
|
node := tallyRow.NodeID
|
2019-04-23 20:21:30 +01:00
|
|
|
// tallyEndTime is the time the at rest tally was saved
|
|
|
|
tallyEndTime := tallyRow.IntervalEndTime.UTC()
|
|
|
|
if tallyEndTime.After(latestTally) {
|
|
|
|
latestTally = tallyEndTime
|
2019-01-16 19:30:33 +00:00
|
|
|
}
|
2020-10-13 13:47:55 +01:00
|
|
|
// create or get AccoutingRollup day entry
|
2019-04-23 20:21:30 +01:00
|
|
|
iDay := time.Date(tallyEndTime.Year(), tallyEndTime.Month(), tallyEndTime.Day(), 0, 0, 0, 0, tallyEndTime.Location())
|
2019-01-16 19:30:33 +00:00
|
|
|
if rollupStats[iDay] == nil {
|
|
|
|
rollupStats[iDay] = make(map[storj.NodeID]*accounting.Rollup)
|
|
|
|
}
|
|
|
|
if rollupStats[iDay][node] == nil {
|
|
|
|
rollupStats[iDay][node] = &accounting.Rollup{NodeID: node, StartTime: iDay}
|
|
|
|
}
|
2020-10-13 13:47:55 +01:00
|
|
|
// increment data at rest sum
|
2019-05-10 20:05:42 +01:00
|
|
|
rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal
|
2019-01-16 19:30:33 +00:00
|
|
|
}
|
2019-04-04 16:20:59 +01:00
|
|
|
|
|
|
|
return latestTally, nil
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// RollupBW aggregates the bandwidth rollups, modifies rollupStats map.
|
2019-06-04 12:36:27 +01:00
|
|
|
func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2020-11-29 16:13:06 +00:00
|
|
|
err = r.sdb.GetBandwidthSince(ctx, lastRollup.UTC(), func(ctx context.Context, row *accounting.StoragenodeBandwidthRollup) error {
|
2019-04-04 16:20:59 +01:00
|
|
|
nodeID := row.NodeID
|
2019-04-23 20:21:30 +01:00
|
|
|
// interval is the time the bw order was saved
|
|
|
|
interval := row.IntervalStart.UTC()
|
|
|
|
day := time.Date(interval.Year(), interval.Month(), interval.Day(), 0, 0, 0, 0, interval.Location())
|
2019-04-04 16:20:59 +01:00
|
|
|
if rollupStats[day] == nil {
|
|
|
|
rollupStats[day] = make(map[storj.NodeID]*accounting.Rollup)
|
|
|
|
}
|
|
|
|
if rollupStats[day][nodeID] == nil {
|
|
|
|
rollupStats[day][nodeID] = &accounting.Rollup{NodeID: nodeID, StartTime: day}
|
|
|
|
}
|
|
|
|
switch row.Action {
|
|
|
|
case uint(pb.PieceAction_INVALID):
|
|
|
|
r.logger.Info("invalid order action type")
|
|
|
|
case uint(pb.PieceAction_PUT):
|
|
|
|
rollupStats[day][nodeID].PutTotal += int64(row.Settled)
|
|
|
|
case uint(pb.PieceAction_GET):
|
|
|
|
rollupStats[day][nodeID].GetTotal += int64(row.Settled)
|
|
|
|
case uint(pb.PieceAction_GET_AUDIT):
|
|
|
|
rollupStats[day][nodeID].GetAuditTotal += int64(row.Settled)
|
|
|
|
case uint(pb.PieceAction_GET_REPAIR):
|
|
|
|
rollupStats[day][nodeID].GetRepairTotal += int64(row.Settled)
|
|
|
|
case uint(pb.PieceAction_PUT_REPAIR):
|
|
|
|
rollupStats[day][nodeID].PutRepairTotal += int64(row.Settled)
|
|
|
|
default:
|
|
|
|
r.logger.Info("delete order type")
|
|
|
|
}
|
2019-04-23 20:21:30 +01:00
|
|
|
|
2020-11-29 16:13:06 +00:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2020-12-11 17:34:22 +00:00
|
|
|
|
2019-04-04 16:20:59 +01:00
|
|
|
return nil
|
2018-11-14 01:22:18 +00:00
|
|
|
}
|