25f8f678ab
For the storagenode usage graph currently, 1. the graph is still likely to contain spikes on the first day of the month for a newly setup storagenode because there's no previous interval_end_time to deduct from. 2. if a node goes offline for too long, say the last usage report in the storageusage cache has an interval_end_time of 2020-07-30 00:00:00+00:00 and later, it comes back online a few days later, it requests for the storage usage from the satellite starting from the current month, say 2021-01-01 00:00:00+00:00, the calculated hours for the first day would be 48 hours and it could be wrong because the cache is missing one day usage report. This PR addresses second issue on the storagenode side by requesting storage usage data, instead of just a month boundary, request for an interval starting from the last day of the previous month to the current day of the current month. The first one will be a tradeoff and wouldn't really matter since it will just be an issue on the first day the storagenode joined the satellite. Updates https://github.com/storj/storj/issues/4178 Change-Id: I041c56c412030ce013dd77dce11b0b5d6550927b
274 lines
6.9 KiB
Go
274 lines
6.9 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package nodestats
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/storj"
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/private/date"
|
|
"storj.io/storj/storagenode/payouts"
|
|
"storj.io/storj/storagenode/pricing"
|
|
"storj.io/storj/storagenode/reputation"
|
|
"storj.io/storj/storagenode/storageusage"
|
|
"storj.io/storj/storagenode/trust"
|
|
)
|
|
|
|
// Config defines nodestats cache configuration.
|
|
type Config struct {
|
|
MaxSleep time.Duration `help:"maximum duration to wait before requesting data" releaseDefault:"300s" devDefault:"1s"`
|
|
ReputationSync time.Duration `help:"how often to sync reputation" releaseDefault:"4h" devDefault:"1m"`
|
|
StorageSync time.Duration `help:"how often to sync storage" releaseDefault:"12h" devDefault:"2m"`
|
|
}
|
|
|
|
// CacheStorage encapsulates cache DBs.
|
|
type CacheStorage struct {
|
|
Reputation reputation.DB
|
|
StorageUsage storageusage.DB
|
|
Payout payouts.DB
|
|
Pricing pricing.DB
|
|
}
|
|
|
|
// Cache runs cache loop and stores reputation stats and storage usage into db.
|
|
//
|
|
// architecture: Chore
|
|
type Cache struct {
|
|
log *zap.Logger
|
|
|
|
db CacheStorage
|
|
service *Service
|
|
payoutEndpoint *payouts.Endpoint
|
|
reputationService *reputation.Service
|
|
trust *trust.Pool
|
|
|
|
maxSleep time.Duration
|
|
Reputation *sync2.Cycle
|
|
Storage *sync2.Cycle
|
|
}
|
|
|
|
// NewCache creates new caching service instance.
|
|
func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service,
|
|
payoutEndpoint *payouts.Endpoint, reputationService *reputation.Service, trust *trust.Pool) *Cache {
|
|
|
|
return &Cache{
|
|
log: log,
|
|
db: db,
|
|
service: service,
|
|
payoutEndpoint: payoutEndpoint,
|
|
reputationService: reputationService,
|
|
trust: trust,
|
|
maxSleep: config.MaxSleep,
|
|
Reputation: sync2.NewCycle(config.ReputationSync),
|
|
Storage: sync2.NewCycle(config.StorageSync),
|
|
}
|
|
}
|
|
|
|
// Run runs loop.
|
|
func (cache *Cache) Run(ctx context.Context) error {
|
|
var group errgroup.Group
|
|
|
|
err := cache.satelliteLoop(ctx, func(satelliteID storj.NodeID) error {
|
|
stubHistory, err := cache.payoutEndpoint.GetAllPaystubs(ctx, satelliteID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := 0; i < len(stubHistory); i++ {
|
|
err := cache.db.Payout.StorePayStub(ctx, stubHistory[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
paymentHistory, err := cache.payoutEndpoint.GetAllPayments(ctx, satelliteID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for j := 0; j < len(paymentHistory); j++ {
|
|
err := cache.db.Payout.StorePayment(ctx, paymentHistory[j])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
pricingModel, err := cache.service.GetPricingModel(ctx, satelliteID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return cache.db.Pricing.Store(ctx, *pricingModel)
|
|
})
|
|
if err != nil {
|
|
cache.log.Error("Get pricing-model/join date failed", zap.Error(err))
|
|
}
|
|
|
|
cache.Reputation.Start(ctx, &group, func(ctx context.Context) error {
|
|
if err := cache.sleep(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := cache.CacheReputationStats(ctx)
|
|
if err != nil {
|
|
cache.log.Error("Get stats query failed", zap.Error(err))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
cache.Storage.Start(ctx, &group, func(ctx context.Context) error {
|
|
if err := cache.sleep(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := cache.CacheSpaceUsage(ctx)
|
|
if err != nil {
|
|
cache.log.Error("Get disk space usage query failed", zap.Error(err))
|
|
}
|
|
|
|
err = cache.CacheHeldAmount(ctx)
|
|
if err != nil {
|
|
cache.log.Error("Get held amount query failed", zap.Error(err))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return group.Wait()
|
|
}
|
|
|
|
// CacheReputationStats queries node stats from all the satellites
|
|
// known to the storagenode and stores information into db.
|
|
func (cache *Cache) CacheReputationStats(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return cache.satelliteLoop(ctx, func(satellite storj.NodeID) error {
|
|
stats, err := cache.service.GetReputationStats(ctx, satellite)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = cache.reputationService.Store(ctx, *stats, satellite); err != nil {
|
|
cache.log.Error("failed to store reputation", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// CacheSpaceUsage queries disk space usage from all the satellites
|
|
// known to the storagenode and stores information into db.
|
|
func (cache *Cache) CacheSpaceUsage(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// get current month edges
|
|
startDate, endDate := date.MonthBoundary(time.Now().UTC())
|
|
// start from last day of previous month
|
|
startDate = startDate.AddDate(0, 0, -1)
|
|
|
|
return cache.satelliteLoop(ctx, func(satellite storj.NodeID) error {
|
|
spaceUsages, err := cache.service.GetDailyStorageUsage(ctx, satellite, startDate, endDate)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = cache.db.StorageUsage.Store(ctx, spaceUsages)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// CacheHeldAmount queries held amount stats and payments from
|
|
// all the satellites known to the storagenode and stores info into db.
|
|
func (cache *Cache) CacheHeldAmount(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return cache.satelliteLoop(ctx, func(satellite storj.NodeID) error {
|
|
now := time.Now().String()
|
|
yearAndMonth, err := date.PeriodToTime(now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
previousMonth := yearAndMonth.AddDate(0, -1, 0).String()
|
|
payStub, err := cache.payoutEndpoint.GetPaystub(ctx, satellite, previousMonth)
|
|
if err != nil {
|
|
if payouts.ErrNoPayStubForPeriod.Has(err) {
|
|
return nil
|
|
}
|
|
|
|
cache.log.Error("payouts err", zap.String("satellite", satellite.String()))
|
|
return err
|
|
}
|
|
|
|
if payStub != nil {
|
|
if err = cache.db.Payout.StorePayStub(ctx, *payStub); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
payment, err := cache.payoutEndpoint.GetPayment(ctx, satellite, previousMonth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if payment != nil {
|
|
if err = cache.db.Payout.StorePayment(ctx, *payment); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// sleep for random interval in [0;maxSleep)
|
|
// returns error if context was cancelled.
|
|
func (cache *Cache) sleep(ctx context.Context) error {
|
|
if cache.maxSleep <= 0 {
|
|
return nil
|
|
}
|
|
|
|
jitter := time.Duration(rand.Int63n(int64(cache.maxSleep)))
|
|
if !sync2.Sleep(ctx, jitter) {
|
|
return ctx.Err()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// satelliteLoop loops over all satellites from trust pool executing provided fn, caching errors if occurred,
|
|
// on each step checks if context has been cancelled.
|
|
func (cache *Cache) satelliteLoop(ctx context.Context, fn func(id storj.NodeID) error) error {
|
|
var groupErr errs.Group
|
|
for _, satellite := range cache.trust.GetSatellites(ctx) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
groupErr.Add(fn(satellite))
|
|
}
|
|
|
|
return groupErr.Err()
|
|
}
|
|
|
|
// Close closes underlying cycles.
|
|
func (cache *Cache) Close() error {
|
|
defer mon.Task()(nil)(nil)
|
|
cache.Reputation.Close()
|
|
cache.Storage.Close()
|
|
return nil
|
|
}
|