satellite/accounting/nodetally: remove segments loop parts

We are switching completely to ranged loop.

https://github.com/storj/storj/issues/5368

Change-Id: I6176a129ba14cf83fb635048d09e6748276b52a1
This commit is contained in:
Michal Niewrzal 2023-04-24 10:32:56 +02:00 committed by Storj Robot
parent f4bc7a7eb2
commit 6a55682bc6
10 changed files with 97 additions and 360 deletions

View File

@ -30,7 +30,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive"
@ -170,7 +169,6 @@ type Satellite struct {
Accounting struct {
Tally *tally.Service
NodeTally *nodetally.Service
Rollup *rollup.Service
ProjectUsage *accounting.Service
ProjectBWCleanup *projectbwcleanup.Chore
@ -653,7 +651,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
system.Accounting.Tally = peer.Accounting.Tally
system.Accounting.NodeTally = peer.Accounting.NodeTally
system.Accounting.Rollup = peer.Accounting.Rollup
system.Accounting.ProjectUsage = api.Accounting.ProjectUsage
system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore

View File

@ -1,189 +0,0 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package nodetally
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
// Error is a standard error class for this package.
var (
Error = errs.Class("node tally")
mon = monkit.Package()
)
// Service is the tally service for data stored on each storage node.
//
// architecture: Chore
type Service struct {
log *zap.Logger
Loop *sync2.Cycle
segmentLoop *segmentloop.Service
storagenodeAccountingDB accounting.StoragenodeAccounting
metabaseDB *metabase.DB
nowFn func() time.Time
}
// New creates a new node tally Service.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, mdb *metabase.DB, loop *segmentloop.Service, interval time.Duration) *Service {
return &Service{
log: log,
Loop: sync2.NewCycle(interval),
segmentLoop: loop,
storagenodeAccountingDB: sdb,
metabaseDB: mdb,
nowFn: time.Now,
}
}
// Run the node tally service loop.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return service.Loop.Run(ctx, func(ctx context.Context) error {
err := service.Tally(ctx)
if err != nil {
service.log.Error("node tally failed", zap.Error(err))
}
return nil
})
}
// Close stops the service and releases any resources.
func (service *Service) Close() error {
service.Loop.Close()
return nil
}
// SetNow allows tests to have the Service act as if the current time is whatever
// they want. This avoids races and sleeping, making tests more reliable and efficient.
func (service *Service) SetNow(now func() time.Time) {
service.nowFn = now
}
// for backwards compatibility.
var monTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally")
// Tally calculates data-at-rest usage once.
func (service *Service) Tally(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// Fetch when the last node tally happened so we can roughly calculate the byte-hours.
lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return Error.Wrap(err)
}
if lastTime.IsZero() {
lastTime = service.nowFn()
}
// add up all nodes
observer := NewObserver(service.log.Named("observer"), service.nowFn())
err = service.segmentLoop.Join(ctx, observer)
if err != nil {
return Error.Wrap(err)
}
finishTime := service.nowFn()
// calculate byte hours, not just bytes
hours := time.Since(lastTime).Hours()
var totalSum float64
for id, pieceSize := range observer.Node {
totalSum += pieceSize
observer.Node[id] = pieceSize * hours
}
monTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
if len(observer.Node) > 0 {
nodeIDs := make([]storj.NodeID, 0, len(observer.Node))
nodeTotals := make([]float64, 0, len(observer.Node))
nodeAliasMap, err := service.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return Error.Wrap(err)
}
for nodeAlias, total := range observer.Node {
nodeID, ok := nodeAliasMap.Node(nodeAlias)
if !ok {
observer.log.Error("unrecognized node alias in tally", zap.Int32("node-alias", int32(nodeAlias)))
continue
}
nodeIDs = append(nodeIDs, nodeID)
nodeTotals = append(nodeTotals, total)
}
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, nodeIDs, nodeTotals)
if err != nil {
return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
}
return nil
}
var _ segmentloop.Observer = (*Observer)(nil)
// Observer observes metainfo and adds up tallies for nodes and buckets.
type Observer struct {
log *zap.Logger
now time.Time
Node map[metabase.NodeAlias]float64
}
// NewObserver returns an segment loop observer that adds up totals for nodes.
func NewObserver(log *zap.Logger, now time.Time) *Observer {
return &Observer{
log: log,
now: now,
Node: make(map[metabase.NodeAlias]float64),
}
}
// LoopStarted is called at each start of a loop.
func (observer *Observer) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
return nil
}
// RemoteSegment is called for each remote segment.
func (observer *Observer) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
// we are expliticy not adding monitoring here as we are tracking loop observers separately
if segment.Expired(observer.now) {
return nil
}
// add node info
minimumRequired := segment.Redundancy.RequiredShares
if minimumRequired <= 0 {
observer.log.Error("failed sanity check", zap.String("StreamID", segment.StreamID.String()), zap.Uint64("Position", segment.Position.Encode()))
return nil
}
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.AliasPieces {
observer.Node[piece.Alias] += pieceSize
}
return nil
}
// InlineSegment is called for each inline segment.
func (observer *Observer) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}

View File

@ -1,124 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package nodetally_test
import (
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/encryption"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/metabase/segmentloop"
)
func TestCalculateNodeAtRestData(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tallySvc := planet.Satellites[0].Accounting.NodeTally
tallySvc.Loop.Pause()
uplink := planet.Uplinks[0]
// Setup: create 50KiB of data for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
// TODO uplink currently hardcode block size so we need to use the same value in test
encryptionParameters := storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 29 * 256 * memory.B.Int32(),
}
expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters)
require.NoError(t, err)
// Execute test: upload a file, then calculate at rest data
expectedBucketName := "testbucket"
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
require.NoError(t, err)
obs := nodetally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err = planet.Satellites[0].Metabase.SegmentLoop.Join(ctx, obs)
require.NoError(t, err)
// Confirm the correct number of shares were stored
rs := satelliteRS(t, planet.Satellites[0])
if !correctRedundencyScheme(len(obs.Node), rs) {
t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node))
}
// Confirm the correct number of bytes were stored on each node
for _, actualTotalBytes := range obs.Node {
assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes))
}
})
}
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
// The shareCount should be a value between RequiredShares and TotalShares where
// RequiredShares is the min number of shares required to recover a segment and
// TotalShares is the number of shares to encode
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
}
func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme {
rs := satellite.Config.Metainfo.RS
return storj.RedundancyScheme{
RequiredShares: int16(rs.Min),
RepairShares: int16(rs.Repair),
OptimalShares: int16(rs.Success),
TotalShares: int16(rs.Total),
ShareSize: rs.ErasureShareSize.Int32(),
}
}
func BenchmarkRemoteSegment(b *testing.B) {
testplanet.Bench(b, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) {
for i := 0; i < 10; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB))
require.NoError(b, err)
}
observer := nodetally.NewObserver(zaptest.NewLogger(b), time.Now())
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)
loopSegments := []*segmentloop.Segment{}
for _, segment := range segments {
loopSegments = append(loopSegments, &segmentloop.Segment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: segment.CreatedAt,
ExpiresAt: segment.ExpiresAt,
Redundancy: segment.Redundancy,
Pieces: segment.Pieces,
})
}
b.Run("multiple segments", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, loopSegment := range loopSegments {
err := observer.RemoteSegment(ctx, loopSegment)
if err != nil {
b.FailNow()
}
}
}
})
})
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
@ -18,13 +19,19 @@ import (
)
var (
// check if Observer and Partial interfaces are satisfied.
_ rangedloop.Observer = (*RangedLoopObserver)(nil)
_ rangedloop.Partial = (*RangedLoopPartial)(nil)
// Error is a standard error class for this package.
Error = errs.Class("node tally")
mon = monkit.Package()
)
// RangedLoopObserver implements node tally ranged loop observer.
type RangedLoopObserver struct {
var (
// check if Observer and Partial interfaces are satisfied.
_ rangedloop.Observer = (*Observer)(nil)
_ rangedloop.Partial = (*observerFork)(nil)
)
// Observer implements node tally ranged loop observer.
type Observer struct {
log *zap.Logger
accounting accounting.StoragenodeAccounting
@ -35,9 +42,9 @@ type RangedLoopObserver struct {
Node map[metabase.NodeAlias]float64
}
// NewRangedLoopObserver creates new RangedLoopObserver.
func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting, metabaseDB *metabase.DB) *RangedLoopObserver {
return &RangedLoopObserver{
// NewObserver creates new tally range loop observer.
func NewObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting, metabaseDB *metabase.DB) *Observer {
return &Observer{
log: log,
accounting: accounting,
metabaseDB: metabaseDB,
@ -47,7 +54,7 @@ func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAcc
}
// Start implements ranged loop observer start method.
func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) (err error) {
func (observer *Observer) Start(ctx context.Context, time time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
observer.Node = map[metabase.NodeAlias]float64{}
@ -62,17 +69,17 @@ func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) (
}
// Fork forks new node tally ranged loop partial.
func (observer *RangedLoopObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
defer mon.Task()(&ctx)(&err)
return NewRangedLoopPartial(observer.log, observer.nowFn), nil
return newObserverFork(observer.log, observer.nowFn), nil
}
// Join joins node tally ranged loop partial to main observer updating main per node usage map.
func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
tallyPartial, ok := partial.(*RangedLoopPartial)
tallyPartial, ok := partial.(*observerFork)
if !ok {
return Error.New("expected partial type %T but got %T", tallyPartial, partial)
}
@ -88,7 +95,7 @@ func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop
var monRangedTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally")
// Finish calculates byte*hours from per node storage usage and save tallies to DB.
func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) {
func (observer *Observer) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
finishTime := observer.nowFn()
@ -125,21 +132,21 @@ func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) {
}
// SetNow overrides the timestamp used to store the result.
func (observer *RangedLoopObserver) SetNow(nowFn func() time.Time) {
func (observer *Observer) SetNow(nowFn func() time.Time) {
observer.nowFn = nowFn
}
// RangedLoopPartial implements node tally ranged loop partial.
type RangedLoopPartial struct {
// observerFork implements node tally ranged loop partial.
type observerFork struct {
log *zap.Logger
nowFn func() time.Time
Node map[metabase.NodeAlias]float64
}
// NewRangedLoopPartial creates new node tally ranged loop partial.
func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPartial {
return &RangedLoopPartial{
// newObserverFork creates new node tally ranged loop fork.
func newObserverFork(log *zap.Logger, nowFn func() time.Time) *observerFork {
return &observerFork{
log: log,
nowFn: nowFn,
Node: map[metabase.NodeAlias]float64{},
@ -147,7 +154,7 @@ func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPa
}
// Process iterates over segment range updating partial node usage map.
func (partial *RangedLoopPartial) Process(ctx context.Context, segments []segmentloop.Segment) error {
func (partial *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
now := partial.nowFn()
for _, segment := range segments {
@ -157,7 +164,7 @@ func (partial *RangedLoopPartial) Process(ctx context.Context, segments []segmen
return nil
}
func (partial *RangedLoopPartial) processSegment(now time.Time, segment segmentloop.Segment) {
func (partial *observerFork) processSegment(now time.Time, segment segmentloop.Segment) {
if segment.Inline() {
return
}

View File

@ -5,11 +5,13 @@ package nodetally_test
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/encryption"
"storj.io/common/memory"
@ -18,6 +20,8 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/metabase/segmentloop"
)
func TestSingleObjectNodeTallyRangedLoop(t *testing.T) {
@ -259,3 +263,61 @@ func TestExpiredObjectsNotCountedInNodeTally(t *testing.T) {
require.GreaterOrEqual(t, totalBytes, minExpectedBytes)
})
}
func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme {
rs := satellite.Config.Metainfo.RS
return storj.RedundancyScheme{
RequiredShares: int16(rs.Min),
RepairShares: int16(rs.Repair),
OptimalShares: int16(rs.Success),
TotalShares: int16(rs.Total),
ShareSize: rs.ErasureShareSize.Int32(),
}
}
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
// The shareCount should be a value between RequiredShares and TotalShares where
// RequiredShares is the min number of shares required to recover a segment and
// TotalShares is the number of shares to encode
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
}
func BenchmarkProcess(b *testing.B) {
testplanet.Bench(b, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) {
for i := 0; i < 10; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB))
require.NoError(b, err)
}
observer := nodetally.NewObserver(zaptest.NewLogger(b), nil, planet.Satellites[0].Metabase.DB)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)
loopSegments := []segmentloop.Segment{}
for _, segment := range segments {
loopSegments = append(loopSegments, segmentloop.Segment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: segment.CreatedAt,
ExpiresAt: segment.ExpiresAt,
Redundancy: segment.Redundancy,
Pieces: segment.Pieces,
})
}
fork, err := observer.Fork(ctx)
require.NoError(b, err)
b.Run("multiple segments", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = fork.Process(ctx, loopSegments)
}
})
})
}

View File

@ -30,7 +30,7 @@ type Config struct {
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
UseObjectsLoop bool `help:"flag to switch between calculating bucket tallies using objects loop or custom query" default:"false"`
UseRangedLoop bool `help:"flag whether to use ranged loop instead of segment loop" default:"false"`
UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"`
ListLimit int `help:"how many objects to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`

View File

@ -24,7 +24,6 @@ import (
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/accounting/projectbwcleanup"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive"
@ -126,7 +125,6 @@ type Core struct {
Accounting struct {
Tally *tally.Service
NodeTally *nodetally.Service
Rollup *rollup.Service
RollupArchiveChore *rolluparchive.Chore
ProjectBWCleanupChore *projectbwcleanup.Chore
@ -445,20 +443,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Debug.Server.Panel.Add(
debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop))
// storage nodes tally
nodeTallyLog := peer.Log.Named("accounting:nodetally")
if config.Tally.UseRangedLoop {
nodeTallyLog.Info("using ranged loop")
} else {
peer.Accounting.NodeTally = nodetally.New(nodeTallyLog, peer.DB.StoragenodeAccounting(), peer.Metainfo.Metabase, peer.Metainfo.SegmentLoop, config.Tally.Interval)
peer.Services.Add(lifecycle.Item{
Name: "accounting:nodetally",
Run: peer.Accounting.NodeTally.Run,
Close: peer.Accounting.NodeTally.Close,
})
}
// Lets add 1 more day so we catch any off by one errors when deleting tallies
orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup, orderExpirationPlusDay)

View File

@ -404,7 +404,7 @@ func TestAllInOne(t *testing.T) {
service := rangedloop.NewService(log, config, metabaseProvider, []rangedloop.Observer{
rangedloop.NewLiveCountObserver(satellite.Metabase.DB, config.SuspiciousProcessedRatio, config.AsOfSystemInterval),
metrics.NewObserver(),
nodetally.NewRangedLoopObserver(log.Named("accounting:nodetally"),
nodetally.NewObserver(log.Named("accounting:nodetally"),
satellite.DB.StoragenodeAccounting(),
satellite.Metabase.DB,
),

View File

@ -67,7 +67,7 @@ type RangedLoop struct {
}
Accounting struct {
NodeTallyObserver *nodetally.RangedLoopObserver
NodeTallyObserver *nodetally.Observer
}
RangedLoop struct {
@ -122,7 +122,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}
{ // setup node tally observer
peer.Accounting.NodeTallyObserver = nodetally.NewRangedLoopObserver(
peer.Accounting.NodeTallyObserver = nodetally.NewObserver(
log.Named("accounting:nodetally"),
db.StoragenodeAccounting(),
metabaseDB)

View File

@ -1102,8 +1102,8 @@ server.private-address: 127.0.0.1:7778
# flag to switch between calculating bucket tallies using objects loop or custom query
# tally.use-objects-loop: false
# flag whether to use ranged loop instead of segment loop
# tally.use-ranged-loop: false
# whether to enable node tally with ranged loop
# tally.use-ranged-loop: true
# address for jaeger agent
# tracing.agent-addr: agent.tracing.datasci.storj.io:5775