4e86951163
at the end of tally iteration, in order to set the new live accounting totals, we were iterating over all live accounting projects. We found a bug with this when running storj-sim. If we restarted the satellite live accounting would be cleared because storj-sim was running the live accounting redis instance. Since live accounting was cleared, at the end of tally, even if it found data in projects, we would not update the live accounting totals because we were iterating over the projects from live accounting to do so. We now iterate over projects found from tally in order to update live accounting We also found that if a user deleted everything from their project, tally would not find it and the live accounting would not be updated. For this reason, we merge live accounting projects into tally projects Change-Id: If0726ba0c7b692d69f42c5806e6c0f47eecccb73
268 lines
8.4 KiB
Go
268 lines
8.4 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package tally
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/metainfo"
|
|
)
|
|
|
|
// Error is a standard error class for this package.
|
|
var (
|
|
Error = errs.Class("tally error")
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// Config contains configurable values for the tally service
|
|
type Config struct {
|
|
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s"`
|
|
}
|
|
|
|
// Service is the tally service for data stored on each storage node
|
|
//
|
|
// architecture: Chore
|
|
type Service struct {
|
|
log *zap.Logger
|
|
Loop *sync2.Cycle
|
|
|
|
metainfoLoop *metainfo.Loop
|
|
liveAccounting accounting.Cache
|
|
storagenodeAccountingDB accounting.StoragenodeAccounting
|
|
projectAccountingDB accounting.ProjectAccounting
|
|
}
|
|
|
|
// New creates a new tally Service
|
|
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metainfo.Loop, interval time.Duration) *Service {
|
|
return &Service{
|
|
log: log,
|
|
Loop: sync2.NewCycle(interval),
|
|
|
|
metainfoLoop: metainfoLoop,
|
|
liveAccounting: liveAccounting,
|
|
storagenodeAccountingDB: sdb,
|
|
projectAccountingDB: pdb,
|
|
}
|
|
}
|
|
|
|
// Run the tally service loop
|
|
func (service *Service) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return service.Loop.Run(ctx, func(ctx context.Context) error {
|
|
err := service.Tally(ctx)
|
|
if err != nil {
|
|
service.log.Error("tally failed", zap.Error(err))
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Close stops the service and releases any resources.
|
|
func (service *Service) Close() error {
|
|
service.Loop.Close()
|
|
return nil
|
|
}
|
|
|
|
// Tally calculates data-at-rest usage once
|
|
func (service *Service) Tally(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
initialLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
// Fetch when the last tally happened so we can roughly calculate the byte-hours.
|
|
lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
if lastTime.IsZero() {
|
|
lastTime = time.Now()
|
|
}
|
|
|
|
// add up all nodes and buckets
|
|
observer := NewObserver(service.log.Named("observer"))
|
|
err = service.metainfoLoop.Join(ctx, observer)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
finishTime := time.Now()
|
|
|
|
// calculate byte hours, not just bytes
|
|
hours := time.Since(lastTime).Hours()
|
|
for id := range observer.Node {
|
|
observer.Node[id] *= hours
|
|
}
|
|
|
|
// save the new results
|
|
var errAtRest, errBucketInfo error
|
|
if len(observer.Node) > 0 {
|
|
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, observer.Node)
|
|
if err != nil {
|
|
errAtRest = errs.New("StorageNodeAccounting.SaveTallies failed: %v", err)
|
|
}
|
|
}
|
|
|
|
if len(observer.Bucket) > 0 {
|
|
// record bucket tallies to DB
|
|
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket)
|
|
if err != nil {
|
|
errAtRest = errs.New("ProjectAccounting.SaveTallies failed: %v", err)
|
|
}
|
|
|
|
// update live accounting totals
|
|
tallyProjectTotals := projectTotalsFromBuckets(observer.Bucket)
|
|
latestLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// empty projects are not returned by the metainfo observer. If a project exists
|
|
// in live accounting, but not in tally projects, we would not update it in live accounting.
|
|
// Thus, we add them and set the total to 0.
|
|
for projectID := range latestLiveTotals {
|
|
if _, ok := tallyProjectTotals[projectID]; !ok {
|
|
tallyProjectTotals[projectID] = 0
|
|
}
|
|
}
|
|
|
|
for projectID, tallyTotal := range tallyProjectTotals {
|
|
delta := latestLiveTotals[projectID] - initialLiveTotals[projectID]
|
|
if delta < 0 {
|
|
delta = 0
|
|
}
|
|
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latestLiveTotals[projectID]+tallyTotal+(delta/2))
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// report bucket metrics
|
|
if len(observer.Bucket) > 0 {
|
|
var total accounting.BucketTally
|
|
for _, bucket := range observer.Bucket {
|
|
monAccounting.IntVal("bucket.objects").Observe(bucket.ObjectCount)
|
|
|
|
monAccounting.IntVal("bucket.segments").Observe(bucket.Segments())
|
|
monAccounting.IntVal("bucket.inline_segments").Observe(bucket.InlineSegments)
|
|
monAccounting.IntVal("bucket.remote_segments").Observe(bucket.RemoteSegments)
|
|
|
|
monAccounting.IntVal("bucket.bytes").Observe(bucket.Bytes())
|
|
monAccounting.IntVal("bucket.inline_bytes").Observe(bucket.InlineBytes)
|
|
monAccounting.IntVal("bucket.remote_bytes").Observe(bucket.RemoteBytes)
|
|
total.Combine(bucket)
|
|
}
|
|
monAccounting.IntVal("total.objects").Observe(total.ObjectCount) //locked
|
|
|
|
monAccounting.IntVal("total.segments").Observe(total.Segments()) //locked
|
|
monAccounting.IntVal("total.inline_segments").Observe(total.InlineSegments) //locked
|
|
monAccounting.IntVal("total.remote_segments").Observe(total.RemoteSegments) //locked
|
|
|
|
monAccounting.IntVal("total.bytes").Observe(total.Bytes()) //locked
|
|
monAccounting.IntVal("total.inline_bytes").Observe(total.InlineBytes) //locked
|
|
monAccounting.IntVal("total.remote_bytes").Observe(total.RemoteBytes) //locked
|
|
}
|
|
|
|
// return errors if something went wrong.
|
|
return errs.Combine(errAtRest, errBucketInfo)
|
|
}
|
|
|
|
var _ metainfo.Observer = (*Observer)(nil)
|
|
|
|
// Observer observes metainfo and adds up tallies for nodes and buckets
|
|
type Observer struct {
|
|
Log *zap.Logger
|
|
Node map[storj.NodeID]float64
|
|
Bucket map[string]*accounting.BucketTally
|
|
}
|
|
|
|
// NewObserver returns an metainfo loop observer that adds up totals for buckets and nodes.
|
|
func NewObserver(log *zap.Logger) *Observer {
|
|
return &Observer{
|
|
Log: log,
|
|
Node: make(map[storj.NodeID]float64),
|
|
Bucket: make(map[string]*accounting.BucketTally),
|
|
}
|
|
}
|
|
|
|
// ensureBucket returns bucket corresponding to the passed in path
|
|
func (observer *Observer) ensureBucket(ctx context.Context, path metainfo.ScopedPath) *accounting.BucketTally {
|
|
bucketID := storj.JoinPaths(path.ProjectIDString, path.BucketName)
|
|
|
|
bucket, exists := observer.Bucket[bucketID]
|
|
if !exists {
|
|
bucket = &accounting.BucketTally{}
|
|
bucket.ProjectID = path.ProjectID
|
|
bucket.BucketName = []byte(path.BucketName)
|
|
observer.Bucket[bucketID] = bucket
|
|
}
|
|
|
|
return bucket
|
|
}
|
|
|
|
// Object is called for each object once.
|
|
func (observer *Observer) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
|
bucket := observer.ensureBucket(ctx, path)
|
|
bucket.ObjectCount++
|
|
return nil
|
|
}
|
|
|
|
// InlineSegment is called for each inline segment.
|
|
func (observer *Observer) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
|
bucket := observer.ensureBucket(ctx, path)
|
|
bucket.InlineSegments++
|
|
bucket.InlineBytes += int64(len(pointer.InlineSegment))
|
|
bucket.MetadataSize += int64(len(pointer.Metadata))
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoteSegment is called for each remote segment.
|
|
func (observer *Observer) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
|
bucket := observer.ensureBucket(ctx, path)
|
|
bucket.RemoteSegments++
|
|
bucket.RemoteBytes += pointer.GetSegmentSize()
|
|
bucket.MetadataSize += int64(len(pointer.Metadata))
|
|
|
|
// add node info
|
|
remote := pointer.GetRemote()
|
|
redundancy := remote.GetRedundancy()
|
|
segmentSize := pointer.GetSegmentSize()
|
|
minimumRequired := redundancy.GetMinReq()
|
|
|
|
if remote == nil || redundancy == nil || minimumRequired <= 0 {
|
|
observer.Log.Error("failed sanity check", zap.String("path", path.Raw))
|
|
return nil
|
|
}
|
|
|
|
pieceSize := float64(segmentSize / int64(minimumRequired))
|
|
for _, piece := range remote.GetRemotePieces() {
|
|
observer.Node[piece.NodeId] += pieceSize
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func projectTotalsFromBuckets(buckets map[string]*accounting.BucketTally) map[uuid.UUID]int64 {
|
|
projectTallyTotals := make(map[uuid.UUID]int64)
|
|
for _, bucket := range buckets {
|
|
projectTallyTotals[bucket.ProjectID] += (bucket.InlineBytes + bucket.RemoteBytes)
|
|
}
|
|
return projectTallyTotals
|
|
}
|
|
|
|
// using custom name to avoid breaking monitoring
|
|
var monAccounting = monkit.ScopeNamed("storj.io/storj/satellite/accounting")
|