From 6a55682bc64048750c83b2a04030c9dcb7bbfc74 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 24 Apr 2023 10:32:56 +0200 Subject: [PATCH] satellite/accounting/nodetally: remove segments loop parts We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I6176a129ba14cf83fb635048d09e6748276b52a1 --- private/testplanet/satellite.go | 3 - satellite/accounting/nodetally/node_tally.go | 189 ------------------ .../accounting/nodetally/node_tally_test.go | 124 ------------ .../nodetally/{ranged_loop.go => observer.go} | 51 +++-- .../{ranged_loop_test.go => observer_test.go} | 62 ++++++ satellite/accounting/tally/tally.go | 2 +- satellite/core.go | 16 -- satellite/metabase/rangedloop/service_test.go | 2 +- satellite/rangedloop.go | 4 +- scripts/testdata/satellite-config.yaml.lock | 4 +- 10 files changed, 97 insertions(+), 360 deletions(-) delete mode 100644 satellite/accounting/nodetally/node_tally.go delete mode 100644 satellite/accounting/nodetally/node_tally_test.go rename satellite/accounting/nodetally/{ranged_loop.go => observer.go} (72%) rename satellite/accounting/nodetally/{ranged_loop_test.go => observer_test.go} (82%) diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 34d50be91..6383f19c1 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -30,7 +30,6 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/accounting/live" - "storj.io/storj/satellite/accounting/nodetally" "storj.io/storj/satellite/accounting/projectbwcleanup" "storj.io/storj/satellite/accounting/rollup" "storj.io/storj/satellite/accounting/rolluparchive" @@ -170,7 +169,6 @@ type Satellite struct { Accounting struct { Tally *tally.Service - NodeTally *nodetally.Service Rollup *rollup.Service ProjectUsage *accounting.Service ProjectBWCleanup *projectbwcleanup.Chore @@ -653,7 +651,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore system.Accounting.Tally = peer.Accounting.Tally - system.Accounting.NodeTally = peer.Accounting.NodeTally system.Accounting.Rollup = peer.Accounting.Rollup system.Accounting.ProjectUsage = api.Accounting.ProjectUsage system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore diff --git a/satellite/accounting/nodetally/node_tally.go b/satellite/accounting/nodetally/node_tally.go deleted file mode 100644 index 547e70d85..000000000 --- a/satellite/accounting/nodetally/node_tally.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package nodetally - -import ( - "context" - "time" - - "github.com/spacemonkeygo/monkit/v3" - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/storj/satellite/accounting" - "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" -) - -// Error is a standard error class for this package. -var ( - Error = errs.Class("node tally") - mon = monkit.Package() -) - -// Service is the tally service for data stored on each storage node. -// -// architecture: Chore -type Service struct { - log *zap.Logger - Loop *sync2.Cycle - - segmentLoop *segmentloop.Service - storagenodeAccountingDB accounting.StoragenodeAccounting - metabaseDB *metabase.DB - nowFn func() time.Time -} - -// New creates a new node tally Service. -func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, mdb *metabase.DB, loop *segmentloop.Service, interval time.Duration) *Service { - return &Service{ - log: log, - Loop: sync2.NewCycle(interval), - - segmentLoop: loop, - storagenodeAccountingDB: sdb, - metabaseDB: mdb, - nowFn: time.Now, - } -} - -// Run the node tally service loop. -func (service *Service) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - return service.Loop.Run(ctx, func(ctx context.Context) error { - err := service.Tally(ctx) - if err != nil { - service.log.Error("node tally failed", zap.Error(err)) - } - return nil - }) -} - -// Close stops the service and releases any resources. -func (service *Service) Close() error { - service.Loop.Close() - return nil -} - -// SetNow allows tests to have the Service act as if the current time is whatever -// they want. This avoids races and sleeping, making tests more reliable and efficient. -func (service *Service) SetNow(now func() time.Time) { - service.nowFn = now -} - -// for backwards compatibility. -var monTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally") - -// Tally calculates data-at-rest usage once. -func (service *Service) Tally(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - // Fetch when the last node tally happened so we can roughly calculate the byte-hours. - lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally) - if err != nil { - return Error.Wrap(err) - } - if lastTime.IsZero() { - lastTime = service.nowFn() - } - - // add up all nodes - observer := NewObserver(service.log.Named("observer"), service.nowFn()) - err = service.segmentLoop.Join(ctx, observer) - if err != nil { - return Error.Wrap(err) - } - finishTime := service.nowFn() - - // calculate byte hours, not just bytes - hours := time.Since(lastTime).Hours() - var totalSum float64 - for id, pieceSize := range observer.Node { - totalSum += pieceSize - observer.Node[id] = pieceSize * hours - } - monTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked - - if len(observer.Node) > 0 { - nodeIDs := make([]storj.NodeID, 0, len(observer.Node)) - nodeTotals := make([]float64, 0, len(observer.Node)) - nodeAliasMap, err := service.metabaseDB.LatestNodesAliasMap(ctx) - if err != nil { - return Error.Wrap(err) - } - for nodeAlias, total := range observer.Node { - nodeID, ok := nodeAliasMap.Node(nodeAlias) - if !ok { - observer.log.Error("unrecognized node alias in tally", zap.Int32("node-alias", int32(nodeAlias))) - continue - } - nodeIDs = append(nodeIDs, nodeID) - nodeTotals = append(nodeTotals, total) - } - err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, nodeIDs, nodeTotals) - if err != nil { - return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err) - } - } - - return nil -} - -var _ segmentloop.Observer = (*Observer)(nil) - -// Observer observes metainfo and adds up tallies for nodes and buckets. -type Observer struct { - log *zap.Logger - now time.Time - - Node map[metabase.NodeAlias]float64 -} - -// NewObserver returns an segment loop observer that adds up totals for nodes. -func NewObserver(log *zap.Logger, now time.Time) *Observer { - return &Observer{ - log: log, - now: now, - - Node: make(map[metabase.NodeAlias]float64), - } -} - -// LoopStarted is called at each start of a loop. -func (observer *Observer) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) { - return nil -} - -// RemoteSegment is called for each remote segment. -func (observer *Observer) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { - // we are expliticy not adding monitoring here as we are tracking loop observers separately - - if segment.Expired(observer.now) { - return nil - } - - // add node info - minimumRequired := segment.Redundancy.RequiredShares - - if minimumRequired <= 0 { - observer.log.Error("failed sanity check", zap.String("StreamID", segment.StreamID.String()), zap.Uint64("Position", segment.Position.Encode())) - return nil - } - - pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme - - for _, piece := range segment.AliasPieces { - observer.Node[piece.Alias] += pieceSize - } - - return nil -} - -// InlineSegment is called for each inline segment. -func (observer *Observer) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - return nil -} diff --git a/satellite/accounting/nodetally/node_tally_test.go b/satellite/accounting/nodetally/node_tally_test.go deleted file mode 100644 index 5871f18f5..000000000 --- a/satellite/accounting/nodetally/node_tally_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package nodetally_test - -import ( - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" - - "storj.io/common/encryption" - "storj.io/common/memory" - "storj.io/common/storj" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite/accounting/nodetally" - "storj.io/storj/satellite/metabase/segmentloop" -) - -func TestCalculateNodeAtRestData(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - tallySvc := planet.Satellites[0].Accounting.NodeTally - tallySvc.Loop.Pause() - uplink := planet.Uplinks[0] - - // Setup: create 50KiB of data for the uplink to upload - expectedData := testrand.Bytes(50 * memory.KiB) - - // TODO uplink currently hardcode block size so we need to use the same value in test - encryptionParameters := storj.EncryptionParameters{ - CipherSuite: storj.EncAESGCM, - BlockSize: 29 * 256 * memory.B.Int32(), - } - expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters) - require.NoError(t, err) - - // Execute test: upload a file, then calculate at rest data - expectedBucketName := "testbucket" - err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData) - require.NoError(t, err) - - obs := nodetally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now()) - err = planet.Satellites[0].Metabase.SegmentLoop.Join(ctx, obs) - require.NoError(t, err) - - // Confirm the correct number of shares were stored - rs := satelliteRS(t, planet.Satellites[0]) - if !correctRedundencyScheme(len(obs.Node), rs) { - t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node)) - } - - // Confirm the correct number of bytes were stored on each node - for _, actualTotalBytes := range obs.Node { - assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes)) - } - }) -} - -func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool { - // The shareCount should be a value between RequiredShares and TotalShares where - // RequiredShares is the min number of shares required to recover a segment and - // TotalShares is the number of shares to encode - return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares) -} - -func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme { - rs := satellite.Config.Metainfo.RS - - return storj.RedundancyScheme{ - RequiredShares: int16(rs.Min), - RepairShares: int16(rs.Repair), - OptimalShares: int16(rs.Success), - TotalShares: int16(rs.Total), - ShareSize: rs.ErasureShareSize.Int32(), - } -} - -func BenchmarkRemoteSegment(b *testing.B) { - testplanet.Bench(b, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - }, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) { - - for i := 0; i < 10; i++ { - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB)) - require.NoError(b, err) - } - - observer := nodetally.NewObserver(zaptest.NewLogger(b), time.Now()) - - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(b, err) - - loopSegments := []*segmentloop.Segment{} - - for _, segment := range segments { - loopSegments = append(loopSegments, &segmentloop.Segment{ - StreamID: segment.StreamID, - Position: segment.Position, - CreatedAt: segment.CreatedAt, - ExpiresAt: segment.ExpiresAt, - Redundancy: segment.Redundancy, - Pieces: segment.Pieces, - }) - } - - b.Run("multiple segments", func(b *testing.B) { - for i := 0; i < b.N; i++ { - for _, loopSegment := range loopSegments { - err := observer.RemoteSegment(ctx, loopSegment) - if err != nil { - b.FailNow() - } - } - } - }) - }) -} diff --git a/satellite/accounting/nodetally/ranged_loop.go b/satellite/accounting/nodetally/observer.go similarity index 72% rename from satellite/accounting/nodetally/ranged_loop.go rename to satellite/accounting/nodetally/observer.go index aad13d18e..edc0cd750 100644 --- a/satellite/accounting/nodetally/ranged_loop.go +++ b/satellite/accounting/nodetally/observer.go @@ -8,6 +8,7 @@ import ( "time" "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/storj" @@ -18,13 +19,19 @@ import ( ) var ( - // check if Observer and Partial interfaces are satisfied. - _ rangedloop.Observer = (*RangedLoopObserver)(nil) - _ rangedloop.Partial = (*RangedLoopPartial)(nil) + // Error is a standard error class for this package. + Error = errs.Class("node tally") + mon = monkit.Package() ) -// RangedLoopObserver implements node tally ranged loop observer. -type RangedLoopObserver struct { +var ( + // check if Observer and Partial interfaces are satisfied. + _ rangedloop.Observer = (*Observer)(nil) + _ rangedloop.Partial = (*observerFork)(nil) +) + +// Observer implements node tally ranged loop observer. +type Observer struct { log *zap.Logger accounting accounting.StoragenodeAccounting @@ -35,9 +42,9 @@ type RangedLoopObserver struct { Node map[metabase.NodeAlias]float64 } -// NewRangedLoopObserver creates new RangedLoopObserver. -func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting, metabaseDB *metabase.DB) *RangedLoopObserver { - return &RangedLoopObserver{ +// NewObserver creates new tally range loop observer. +func NewObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting, metabaseDB *metabase.DB) *Observer { + return &Observer{ log: log, accounting: accounting, metabaseDB: metabaseDB, @@ -47,7 +54,7 @@ func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAcc } // Start implements ranged loop observer start method. -func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) (err error) { +func (observer *Observer) Start(ctx context.Context, time time.Time) (err error) { defer mon.Task()(&ctx)(&err) observer.Node = map[metabase.NodeAlias]float64{} @@ -62,17 +69,17 @@ func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) ( } // Fork forks new node tally ranged loop partial. -func (observer *RangedLoopObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { +func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { defer mon.Task()(&ctx)(&err) - return NewRangedLoopPartial(observer.log, observer.nowFn), nil + return newObserverFork(observer.log, observer.nowFn), nil } // Join joins node tally ranged loop partial to main observer updating main per node usage map. -func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) { +func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) { defer mon.Task()(&ctx)(&err) - tallyPartial, ok := partial.(*RangedLoopPartial) + tallyPartial, ok := partial.(*observerFork) if !ok { return Error.New("expected partial type %T but got %T", tallyPartial, partial) } @@ -88,7 +95,7 @@ func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop var monRangedTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally") // Finish calculates byte*hours from per node storage usage and save tallies to DB. -func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) { +func (observer *Observer) Finish(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) finishTime := observer.nowFn() @@ -125,21 +132,21 @@ func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) { } // SetNow overrides the timestamp used to store the result. -func (observer *RangedLoopObserver) SetNow(nowFn func() time.Time) { +func (observer *Observer) SetNow(nowFn func() time.Time) { observer.nowFn = nowFn } -// RangedLoopPartial implements node tally ranged loop partial. -type RangedLoopPartial struct { +// observerFork implements node tally ranged loop partial. +type observerFork struct { log *zap.Logger nowFn func() time.Time Node map[metabase.NodeAlias]float64 } -// NewRangedLoopPartial creates new node tally ranged loop partial. -func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPartial { - return &RangedLoopPartial{ +// newObserverFork creates new node tally ranged loop fork. +func newObserverFork(log *zap.Logger, nowFn func() time.Time) *observerFork { + return &observerFork{ log: log, nowFn: nowFn, Node: map[metabase.NodeAlias]float64{}, @@ -147,7 +154,7 @@ func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPa } // Process iterates over segment range updating partial node usage map. -func (partial *RangedLoopPartial) Process(ctx context.Context, segments []segmentloop.Segment) error { +func (partial *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { now := partial.nowFn() for _, segment := range segments { @@ -157,7 +164,7 @@ func (partial *RangedLoopPartial) Process(ctx context.Context, segments []segmen return nil } -func (partial *RangedLoopPartial) processSegment(now time.Time, segment segmentloop.Segment) { +func (partial *observerFork) processSegment(now time.Time, segment segmentloop.Segment) { if segment.Inline() { return } diff --git a/satellite/accounting/nodetally/ranged_loop_test.go b/satellite/accounting/nodetally/observer_test.go similarity index 82% rename from satellite/accounting/nodetally/ranged_loop_test.go rename to satellite/accounting/nodetally/observer_test.go index f6d6468b3..0e9cb3118 100644 --- a/satellite/accounting/nodetally/ranged_loop_test.go +++ b/satellite/accounting/nodetally/observer_test.go @@ -5,11 +5,13 @@ package nodetally_test import ( "fmt" + "strconv" "testing" "time" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" "storj.io/common/encryption" "storj.io/common/memory" @@ -18,6 +20,8 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" + "storj.io/storj/satellite/accounting/nodetally" + "storj.io/storj/satellite/metabase/segmentloop" ) func TestSingleObjectNodeTallyRangedLoop(t *testing.T) { @@ -259,3 +263,61 @@ func TestExpiredObjectsNotCountedInNodeTally(t *testing.T) { require.GreaterOrEqual(t, totalBytes, minExpectedBytes) }) } + +func satelliteRS(t *testing.T, satellite *testplanet.Satellite) storj.RedundancyScheme { + rs := satellite.Config.Metainfo.RS + + return storj.RedundancyScheme{ + RequiredShares: int16(rs.Min), + RepairShares: int16(rs.Repair), + OptimalShares: int16(rs.Success), + TotalShares: int16(rs.Total), + ShareSize: rs.ErasureShareSize.Int32(), + } +} + +func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool { + // The shareCount should be a value between RequiredShares and TotalShares where + // RequiredShares is the min number of shares required to recover a segment and + // TotalShares is the number of shares to encode + return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares) +} + +func BenchmarkProcess(b *testing.B) { + testplanet.Bench(b, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) { + + for i := 0; i < 10; i++ { + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB)) + require.NoError(b, err) + } + + observer := nodetally.NewObserver(zaptest.NewLogger(b), nil, planet.Satellites[0].Metabase.DB) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(b, err) + + loopSegments := []segmentloop.Segment{} + + for _, segment := range segments { + loopSegments = append(loopSegments, segmentloop.Segment{ + StreamID: segment.StreamID, + Position: segment.Position, + CreatedAt: segment.CreatedAt, + ExpiresAt: segment.ExpiresAt, + Redundancy: segment.Redundancy, + Pieces: segment.Pieces, + }) + } + + fork, err := observer.Fork(ctx) + require.NoError(b, err) + + b.Run("multiple segments", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = fork.Process(ctx, loopSegments) + } + }) + }) +} diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 7b415d64f..65e7ed267 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -30,7 +30,7 @@ type Config struct { SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"` ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"` UseObjectsLoop bool `help:"flag to switch between calculating bucket tallies using objects loop or custom query" default:"false"` - UseRangedLoop bool `help:"flag whether to use ranged loop instead of segment loop" default:"false"` + UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"` ListLimit int `help:"how many objects to query in a batch" default:"2500"` AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` diff --git a/satellite/core.go b/satellite/core.go index fee945502..b8b43f10c 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -24,7 +24,6 @@ import ( "storj.io/storj/private/lifecycle" version_checker "storj.io/storj/private/version/checker" "storj.io/storj/satellite/accounting" - "storj.io/storj/satellite/accounting/nodetally" "storj.io/storj/satellite/accounting/projectbwcleanup" "storj.io/storj/satellite/accounting/rollup" "storj.io/storj/satellite/accounting/rolluparchive" @@ -126,7 +125,6 @@ type Core struct { Accounting struct { Tally *tally.Service - NodeTally *nodetally.Service Rollup *rollup.Service RollupArchiveChore *rolluparchive.Chore ProjectBWCleanupChore *projectbwcleanup.Chore @@ -445,20 +443,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Debug.Server.Panel.Add( debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop)) - // storage nodes tally - nodeTallyLog := peer.Log.Named("accounting:nodetally") - - if config.Tally.UseRangedLoop { - nodeTallyLog.Info("using ranged loop") - } else { - peer.Accounting.NodeTally = nodetally.New(nodeTallyLog, peer.DB.StoragenodeAccounting(), peer.Metainfo.Metabase, peer.Metainfo.SegmentLoop, config.Tally.Interval) - peer.Services.Add(lifecycle.Item{ - Name: "accounting:nodetally", - Run: peer.Accounting.NodeTally.Run, - Close: peer.Accounting.NodeTally.Close, - }) - } - // Lets add 1 more day so we catch any off by one errors when deleting tallies orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup, orderExpirationPlusDay) diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index 4cbeaabf4..613718d67 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -404,7 +404,7 @@ func TestAllInOne(t *testing.T) { service := rangedloop.NewService(log, config, metabaseProvider, []rangedloop.Observer{ rangedloop.NewLiveCountObserver(satellite.Metabase.DB, config.SuspiciousProcessedRatio, config.AsOfSystemInterval), metrics.NewObserver(), - nodetally.NewRangedLoopObserver(log.Named("accounting:nodetally"), + nodetally.NewObserver(log.Named("accounting:nodetally"), satellite.DB.StoragenodeAccounting(), satellite.Metabase.DB, ), diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index ec62850da..486d0df69 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -67,7 +67,7 @@ type RangedLoop struct { } Accounting struct { - NodeTallyObserver *nodetally.RangedLoopObserver + NodeTallyObserver *nodetally.Observer } RangedLoop struct { @@ -122,7 +122,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf } { // setup node tally observer - peer.Accounting.NodeTallyObserver = nodetally.NewRangedLoopObserver( + peer.Accounting.NodeTallyObserver = nodetally.NewObserver( log.Named("accounting:nodetally"), db.StoragenodeAccounting(), metabaseDB) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index ffb3ccdad..2577c58bc 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -1102,8 +1102,8 @@ server.private-address: 127.0.0.1:7778 # flag to switch between calculating bucket tallies using objects loop or custom query # tally.use-objects-loop: false -# flag whether to use ranged loop instead of segment loop -# tally.use-ranged-loop: false +# whether to enable node tally with ranged loop +# tally.use-ranged-loop: true # address for jaeger agent # tracing.agent-addr: agent.tracing.datasci.storj.io:5775