satellite/accounting: move storage node tally to separate service

Current tally is calculating storage both for buckets and
storage nodes. This change is moving nodes storage
calculation to separate service that will be using
segment loop.

Change-Id: I9e68bfa0bc751c82ff738c71ca58d311f257bd8d
This commit is contained in:
Michał Niewrzał 2021-06-01 18:44:09 +02:00 committed by Egon Elbre
parent f8eebbc115
commit cbbbfca439
6 changed files with 267 additions and 106 deletions

View File

@ -30,6 +30,7 @@ import (
"storj.io/storj/satellite" "storj.io/storj/satellite"
"storj.io/storj/satellite/accounting" "storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/accounting/projectbwcleanup" "storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup" "storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive" "storj.io/storj/satellite/accounting/rolluparchive"
@ -133,6 +134,7 @@ type Satellite struct {
Accounting struct { Accounting struct {
Tally *tally.Service Tally *tally.Service
NodeTally *nodetally.Service
Rollup *rollup.Service Rollup *rollup.Service
ProjectUsage *accounting.Service ProjectUsage *accounting.Service
ProjectBWCleanup *projectbwcleanup.Chore ProjectBWCleanup *projectbwcleanup.Chore
@ -567,6 +569,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.Accounting.Tally = peer.Accounting.Tally system.Accounting.Tally = peer.Accounting.Tally
system.Accounting.NodeTally = peer.Accounting.NodeTally
system.Accounting.Rollup = peer.Accounting.Rollup system.Accounting.Rollup = peer.Accounting.Rollup
system.Accounting.ProjectUsage = api.Accounting.ProjectUsage system.Accounting.ProjectUsage = api.Accounting.ProjectUsage
system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore

View File

@ -0,0 +1,171 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package nodetally
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase/segmentloop"
)
// Error is a standard error class for this package.
var (
Error = errs.Class("node tally")
mon = monkit.Package()
)
// Service is the tally service for data stored on each storage node.
//
// architecture: Chore
type Service struct {
log *zap.Logger
Loop *sync2.Cycle
segmentLoop *segmentloop.Service
storagenodeAccountingDB accounting.StoragenodeAccounting
nowFn func() time.Time
}
// New creates a new node tally Service.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, loop *segmentloop.Service, interval time.Duration) *Service {
return &Service{
log: log,
Loop: sync2.NewCycle(interval),
segmentLoop: loop,
storagenodeAccountingDB: sdb,
nowFn: time.Now,
}
}
// Run the node tally service loop.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return service.Loop.Run(ctx, func(ctx context.Context) error {
err := service.Tally(ctx)
if err != nil {
service.log.Error("node tally failed", zap.Error(err))
}
return nil
})
}
// Close stops the service and releases any resources.
func (service *Service) Close() error {
service.Loop.Close()
return nil
}
// SetNow allows tests to have the Service act as if the current time is whatever
// they want. This avoids races and sleeping, making tests more reliable and efficient.
func (service *Service) SetNow(now func() time.Time) {
service.nowFn = now
}
// for backwards compatibility.
var monTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally")
// Tally calculates data-at-rest usage once.
func (service *Service) Tally(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// Fetch when the last node tally happened so we can roughly calculate the byte-hours.
lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return Error.Wrap(err)
}
if lastTime.IsZero() {
lastTime = service.nowFn()
}
// add up all nodes
observer := NewObserver(service.log.Named("observer"), service.nowFn())
err = service.segmentLoop.Join(ctx, observer)
if err != nil {
return Error.Wrap(err)
}
finishTime := service.nowFn()
// calculate byte hours, not just bytes
hours := time.Since(lastTime).Hours()
var totalSum float64
for id, pieceSize := range observer.Node {
totalSum += pieceSize
observer.Node[id] = pieceSize * hours
}
monTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
if len(observer.Node) > 0 {
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, observer.Node)
if err != nil {
return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
}
return nil
}
var _ segmentloop.Observer = (*Observer)(nil)
// Observer observes metainfo and adds up tallies for nodes and buckets.
type Observer struct {
log *zap.Logger
now time.Time
Node map[storj.NodeID]float64
}
// NewObserver returns an segment loop observer that adds up totals for nodes.
func NewObserver(log *zap.Logger, now time.Time) *Observer {
return &Observer{
log: log,
now: now,
Node: make(map[storj.NodeID]float64),
}
}
// LoopStarted is called at each start of a loop.
func (observer *Observer) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
return nil
}
// RemoteSegment is called for each remote segment.
func (observer *Observer) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if segment.Expired(observer.now) {
return nil
}
// add node info
minimumRequired := segment.Redundancy.RequiredShares
if minimumRequired <= 0 {
observer.log.Error("failed sanity check", zap.String("StreamID", segment.StreamID.String()), zap.Uint64("Position", segment.Position.Encode()))
return nil
}
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.Pieces {
observer.Node[piece.StorageNode] += pieceSize
}
return nil
}
// InlineSegment is called for each inline segment.
func (observer *Observer) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}

View File

@ -0,0 +1,80 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package nodetally_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/encryption"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/accounting/nodetally"
)
func TestCalculateNodeAtRestData(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tallySvc := planet.Satellites[0].Accounting.NodeTally
tallySvc.Loop.Pause()
uplink := planet.Uplinks[0]
// Setup: create 50KiB of data for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
// TODO uplink currently hardcode block size so we need to use the same value in test
encryptionParameters := storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 29 * 256 * memory.B.Int32(),
}
expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters)
require.NoError(t, err)
// Execute test: upload a file, then calculate at rest data
expectedBucketName := "testbucket"
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
require.NoError(t, err)
obs := nodetally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err = planet.Satellites[0].Metainfo.SegmentLoop.Join(ctx, obs)
require.NoError(t, err)
// Confirm the correct number of shares were stored
rs := satelliteRS(t, planet.Satellites[0])
if !correctRedundencyScheme(len(obs.Node), rs) {
t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node))
}
// Confirm the correct number of bytes were stored on each node
for _, actualTotalBytes := range obs.Node {
assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes))
}
})
}
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
// The shareCount should be a value between RequiredShares and TotalShares where
// RequiredShares is the min number of shares required to recover a segment and
// TotalShares is the number of shares to encode
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
}
func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme {
rs := satellite.Config.Metainfo.RS
return storj.RedundancyScheme{
RequiredShares: int16(rs.Min),
RepairShares: int16(rs.Repair),
OptimalShares: int16(rs.Success),
TotalShares: int16(rs.Total),
ShareSize: rs.ErasureShareSize.Int32(),
}
}

View File

@ -11,7 +11,6 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2" "storj.io/common/sync2"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/satellite/accounting" "storj.io/storj/satellite/accounting"
@ -165,15 +164,6 @@ func (service *Service) Tally(ctx context.Context) (err error) {
} }
} }
// Fetch when the last tally happened so we can roughly calculate the byte-hours.
lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return Error.Wrap(err)
}
if lastTime.IsZero() {
lastTime = service.nowFn()
}
// add up all nodes and buckets // add up all nodes and buckets
observer := NewObserver(service.log.Named("observer"), service.nowFn()) observer := NewObserver(service.log.Named("observer"), service.nowFn())
err = service.metainfoLoop.Join(ctx, observer) err = service.metainfoLoop.Join(ctx, observer)
@ -182,29 +172,13 @@ func (service *Service) Tally(ctx context.Context) (err error) {
} }
finishTime := service.nowFn() finishTime := service.nowFn()
// calculate byte hours, not just bytes
hours := time.Since(lastTime).Hours()
var totalSum float64
for id, pieceSize := range observer.Node {
totalSum += pieceSize
observer.Node[id] = pieceSize * hours
}
mon.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
// save the new results // save the new results
var errAtRest, errBucketInfo error var errAtRest error
if len(observer.Node) > 0 {
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, observer.Node)
if err != nil {
errAtRest = errs.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
}
if len(observer.Bucket) > 0 { if len(observer.Bucket) > 0 {
// record bucket tallies to DB // record bucket tallies to DB
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket) err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket)
if err != nil { if err != nil {
errAtRest = errs.New("ProjectAccounting.SaveTallies failed: %v", err) errAtRest = Error.New("ProjectAccounting.SaveTallies failed: %v", err)
} }
updateLiveAccountingTotals(projectTotalsFromBuckets(observer.Bucket)) updateLiveAccountingTotals(projectTotalsFromBuckets(observer.Bucket))
@ -236,7 +210,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
} }
// return errors if something went wrong. // return errors if something went wrong.
return errs.Combine(errAtRest, errBucketInfo) return errAtRest
} }
var _ metaloop.Observer = (*Observer)(nil) var _ metaloop.Observer = (*Observer)(nil)
@ -245,7 +219,6 @@ var _ metaloop.Observer = (*Observer)(nil)
type Observer struct { type Observer struct {
Now time.Time Now time.Time
Log *zap.Logger Log *zap.Logger
Node map[storj.NodeID]float64
Bucket map[metabase.BucketLocation]*accounting.BucketTally Bucket map[metabase.BucketLocation]*accounting.BucketTally
} }
@ -255,7 +228,6 @@ func NewObserver(log *zap.Logger, now time.Time) *Observer {
return &Observer{ return &Observer{
Now: now, Now: now,
Log: log, Log: log,
Node: make(map[storj.NodeID]float64),
Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally), Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally),
} }
} }
@ -320,20 +292,6 @@ func (observer *Observer) RemoteSegment(ctx context.Context, segment *metaloop.S
bucket.RemoteSegments++ bucket.RemoteSegments++
bucket.RemoteBytes += int64(segment.EncryptedSize) bucket.RemoteBytes += int64(segment.EncryptedSize)
// add node info
minimumRequired := segment.Redundancy.RequiredShares
if minimumRequired <= 0 {
observer.Log.Error("failed sanity check", zap.ByteString("key", segment.Location.Encode()))
return nil
}
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.Pieces {
observer.Node[piece.StorageNode] += pieceSize
}
return nil return nil
} }

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/common/encryption"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
@ -116,47 +115,6 @@ func TestOnlyInline(t *testing.T) {
}) })
} }
func TestCalculateNodeAtRestData(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tallySvc := planet.Satellites[0].Accounting.Tally
tallySvc.Loop.Pause()
uplink := planet.Uplinks[0]
// Setup: create 50KiB of data for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
// TODO uplink currently hardcode block size so we need to use the same value in test
encryptionParameters := storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 29 * 256 * memory.B.Int32(),
}
expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters)
require.NoError(t, err)
// Execute test: upload a file, then calculate at rest data
expectedBucketName := "testbucket"
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
require.NoError(t, err)
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err = planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
require.NoError(t, err)
// Confirm the correct number of shares were stored
rs := satelliteRS(t, planet.Satellites[0])
if !correctRedundencyScheme(len(obs.Node), rs) {
t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node))
}
// Confirm the correct number of bytes were stored on each node
for _, actualTotalBytes := range obs.Node {
assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes))
}
})
}
func TestCalculateBucketAtRestData(t *testing.T) { func TestCalculateBucketAtRestData(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 2, SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 2,
@ -321,22 +279,3 @@ func TestTallyEmptyProjectUpdatesLiveAccounting(t *testing.T) {
require.Zero(t, p1Total) require.Zero(t, p1Total)
}) })
} }
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
// The shareCount should be a value between RequiredShares and TotalShares where
// RequiredShares is the min number of shares required to recover a segment and
// TotalShares is the number of shares to encode
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
}
func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme {
rs := satellite.Config.Metainfo.RS
return storj.RedundancyScheme{
RequiredShares: int16(rs.Min),
RepairShares: int16(rs.Repair),
OptimalShares: int16(rs.Success),
TotalShares: int16(rs.Total),
ShareSize: rs.ErasureShareSize.Int32(),
}
}

View File

@ -24,6 +24,7 @@ import (
"storj.io/storj/private/lifecycle" "storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker" version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting" "storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/accounting/projectbwcleanup" "storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup" "storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive" "storj.io/storj/satellite/accounting/rolluparchive"
@ -106,6 +107,7 @@ type Core struct {
Accounting struct { Accounting struct {
Tally *tally.Service Tally *tally.Service
NodeTally *nodetally.Service
Rollup *rollup.Service Rollup *rollup.Service
RollupArchiveChore *rolluparchive.Chore RollupArchiveChore *rolluparchive.Chore
ProjectBWCleanupChore *projectbwcleanup.Chore ProjectBWCleanupChore *projectbwcleanup.Chore
@ -364,6 +366,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Debug.Server.Panel.Add( peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop)) debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop))
// storage nodes tally
peer.Accounting.NodeTally = nodetally.New(peer.Log.Named("accounting:nodetally"), peer.DB.StoragenodeAccounting(), peer.Metainfo.SegmentLoop, config.Tally.Interval)
peer.Services.Add(lifecycle.Item{
Name: "accounting:nodetally",
Run: peer.Accounting.NodeTally.Run,
Close: peer.Accounting.NodeTally.Close,
})
// Lets add 1 more day so we catch any off by one errors when deleting tallies // Lets add 1 more day so we catch any off by one errors when deleting tallies
orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay) peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay)