storj/satellite/accounting/rollup/rollup.go
Stefan Benten 49a30ce4a7
satellite/payments: Set proper defaults for the release (#3806)
* Slight adjustments to the migration

Change-Id: I68ae81c010c3414fde2845df16ab124f8d17834b

* Change Coupon Value

Change-Id: I0f241d09e5f716f1d1b3f0688643ba7f614d83c4

* Change AlphaUsage to 5GB

Change-Id: I5d25c6b5750684510cda8b14a27f38d5b2b07408

* change config lock

Change-Id: Ib7c7a54555ba2387c9aa8dd60a0501b0ee6491dd

* Use Scan properly

Change-Id: Ie39cf4644e3ddd703a254e2f5e616763dd805235

* Fix Config Lock

Change-Id: I558ecc1c1becfaaefc7aea5ad2fe83fd6bf6b561
2020-03-16 22:53:12 +01:00

187 lines
5.6 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package rollup
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/accounting"
)
// Config contains configurable values for rollup
type Config struct {
Interval time.Duration `help:"how frequently rollup should run" releaseDefault:"24h" devDefault:"120s"`
MaxAlphaUsage memory.Size `help:"the bandwidth and storage usage limit for the alpha release" releaseDefault:"5GB" devDefault:"25GB"`
DeleteTallies bool `help:"option for deleting tallies after they are rolled up" default:"true"`
}
// Service is the rollup service for totalling data on storage nodes on daily intervals
//
// architecture: Chore
type Service struct {
logger *zap.Logger
Loop *sync2.Cycle
sdb accounting.StoragenodeAccounting
deleteTallies bool
}
// New creates a new rollup service
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool) *Service {
return &Service{
logger: logger,
Loop: sync2.NewCycle(interval),
sdb: sdb,
deleteTallies: deleteTallies,
}
}
// Run the Rollup loop
func (r *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return r.Loop.Run(ctx, func(ctx context.Context) error {
err := r.Rollup(ctx)
if err != nil {
r.logger.Error("rollup failed", zap.Error(err))
}
return nil
})
}
// Close stops the service and releases any resources.
func (r *Service) Close() error {
r.Loop.Close()
return nil
}
// Rollup aggregates storage and bandwidth amounts for the time interval
func (r *Service) Rollup(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// only Rollup new things - get LastRollup
lastRollup, err := r.sdb.LastTimestamp(ctx, accounting.LastRollup)
if err != nil {
return Error.Wrap(err)
}
rollupStats := make(accounting.RollupStats)
latestTally, err := r.RollupStorage(ctx, lastRollup, rollupStats)
if err != nil {
return Error.Wrap(err)
}
err = r.RollupBW(ctx, lastRollup, rollupStats)
if err != nil {
return Error.Wrap(err)
}
//remove the latest day (which we cannot know is complete), then push to DB
latestTally = time.Date(latestTally.Year(), latestTally.Month(), latestTally.Day(), 0, 0, 0, 0, latestTally.Location())
delete(rollupStats, latestTally)
if len(rollupStats) == 0 {
r.logger.Info("RollupStats is empty")
return nil
}
err = r.sdb.SaveRollup(ctx, latestTally, rollupStats)
if err != nil {
return Error.Wrap(err)
}
if r.deleteTallies {
// Delete already rolled up tallies
err = r.sdb.DeleteTalliesBefore(ctx, latestTally)
if err != nil {
return Error.Wrap(err)
}
}
return nil
}
// RollupStorage rolls up storage tally, modifies rollupStats map
func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (latestTally time.Time, err error) {
defer mon.Task()(&ctx)(&err)
tallies, err := r.sdb.GetTalliesSince(ctx, lastRollup)
if err != nil {
return lastRollup, Error.Wrap(err)
}
if len(tallies) == 0 {
r.logger.Info("Rollup found no new tallies")
return lastRollup, nil
}
//loop through tallies and build Rollup
for _, tallyRow := range tallies {
node := tallyRow.NodeID
// tallyEndTime is the time the at rest tally was saved
tallyEndTime := tallyRow.IntervalEndTime.UTC()
if tallyEndTime.After(latestTally) {
latestTally = tallyEndTime
}
//create or get AccoutingRollup day entry
iDay := time.Date(tallyEndTime.Year(), tallyEndTime.Month(), tallyEndTime.Day(), 0, 0, 0, 0, tallyEndTime.Location())
if rollupStats[iDay] == nil {
rollupStats[iDay] = make(map[storj.NodeID]*accounting.Rollup)
}
if rollupStats[iDay][node] == nil {
rollupStats[iDay][node] = &accounting.Rollup{NodeID: node, StartTime: iDay}
}
//increment data at rest sum
rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal
}
return latestTally, nil
}
// RollupBW aggregates the bandwidth rollups, modifies rollupStats map
func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (err error) {
defer mon.Task()(&ctx)(&err)
var latestTally time.Time
bws, err := r.sdb.GetBandwidthSince(ctx, lastRollup.UTC())
if err != nil {
return Error.Wrap(err)
}
if len(bws) == 0 {
r.logger.Info("Rollup found no new bw rollups")
return nil
}
for _, row := range bws {
nodeID := row.NodeID
// interval is the time the bw order was saved
interval := row.IntervalStart.UTC()
if interval.After(latestTally) {
latestTally = interval
}
day := time.Date(interval.Year(), interval.Month(), interval.Day(), 0, 0, 0, 0, interval.Location())
if rollupStats[day] == nil {
rollupStats[day] = make(map[storj.NodeID]*accounting.Rollup)
}
if rollupStats[day][nodeID] == nil {
rollupStats[day][nodeID] = &accounting.Rollup{NodeID: nodeID, StartTime: day}
}
switch row.Action {
case uint(pb.PieceAction_INVALID):
r.logger.Info("invalid order action type")
case uint(pb.PieceAction_PUT):
rollupStats[day][nodeID].PutTotal += int64(row.Settled)
case uint(pb.PieceAction_GET):
rollupStats[day][nodeID].GetTotal += int64(row.Settled)
case uint(pb.PieceAction_GET_AUDIT):
rollupStats[day][nodeID].GetAuditTotal += int64(row.Settled)
case uint(pb.PieceAction_GET_REPAIR):
rollupStats[day][nodeID].GetRepairTotal += int64(row.Settled)
case uint(pb.PieceAction_PUT_REPAIR):
rollupStats[day][nodeID].PutRepairTotal += int64(row.Settled)
default:
r.logger.Info("delete order type")
}
}
return nil
}