From 1aa24b9f0db2b88cd2272b13b3e75bfce2ec6a27 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 24 Apr 2023 12:07:16 +0200 Subject: [PATCH] satellite/audit: remove segments loop parts We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I9cec0ac454f40f19d52c078a8b1870c4d192bd7a --- private/testplanet/satellite.go | 2 - satellite/accounting/billing_test.go | 6 +- satellite/audit/audit_test.go | 6 +- satellite/audit/chore.go | 101 ------------------ satellite/audit/collector.go | 71 ------------ satellite/audit/common_test.go | 18 +--- satellite/audit/getshare_test.go | 4 +- satellite/audit/integration_test.go | 2 +- satellite/audit/observer.go | 58 ++++++++-- .../{collector_test.go => observer_test.go} | 31 +++--- satellite/audit/reservoir.go | 10 +- satellite/audit/reservoir_test.go | 18 ++-- satellite/audit/reverifier_test.go | 10 +- satellite/audit/reverify_test.go | 24 ++--- satellite/audit/verifier_test.go | 44 ++++---- satellite/audit/worker.go | 2 +- satellite/core.go | 40 ++----- satellite/overlay/selection_test.go | 2 +- satellite/overlay/service_test.go | 2 +- satellite/repair/repair_test.go | 19 ++-- satellite/reputation/service_test.go | 3 +- scripts/testdata/satellite-config.yaml.lock | 4 +- 22 files changed, 157 insertions(+), 320 deletions(-) delete mode 100644 satellite/audit/chore.go delete mode 100644 satellite/audit/collector.go rename satellite/audit/{collector_test.go => observer_test.go} (81%) diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index f810d45d1..3d02827ff 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -143,7 +143,6 @@ type Satellite struct { ReverifyQueue audit.ReverifyQueue Worker *audit.Worker ReverifyWorker *audit.ReverifyWorker - Chore *audit.Chore Verifier *audit.Verifier Reverifier *audit.Reverifier Reporter audit.Reporter @@ -637,7 +636,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Audit.ReverifyQueue = auditorPeer.Audit.ReverifyQueue system.Audit.Worker = auditorPeer.Audit.Worker system.Audit.ReverifyWorker = auditorPeer.Audit.ReverifyWorker - system.Audit.Chore = peer.Audit.Chore system.Audit.Verifier = auditorPeer.Audit.Verifier system.Audit.Reverifier = auditorPeer.Audit.Reverifier system.Audit.Reporter = auditorPeer.Audit.Reporter diff --git a/satellite/accounting/billing_test.go b/satellite/accounting/billing_test.go index 1b5e462b9..18fe3b227 100644 --- a/satellite/accounting/billing_test.go +++ b/satellite/accounting/billing_test.go @@ -298,7 +298,7 @@ func TestBilling_UploadNoEgress(t *testing.T) { // Make sure that we don't have interference with billed repair traffic // in case of a bug. There is a specific test to verify that the repair // traffic isn't billed. - satelliteSys.Audit.Chore.Loop.Stop() + satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop() satelliteSys.Repair.Repairer.Loop.Stop() // stop any async flushes because we want to be sure when some values are // written to avoid races @@ -340,7 +340,7 @@ func TestBilling_DownloadTraffic(t *testing.T) { // Make sure that we don't have interference with billed repair traffic // in case of a bug. There is a specific test to verify that the repair // traffic isn't billed. - satelliteSys.Audit.Chore.Loop.Stop() + satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop() satelliteSys.Repair.Repairer.Loop.Stop() // stop any async flushes because we want to be sure when some values are // written to avoid races @@ -376,7 +376,7 @@ func TestBilling_ExpiredFiles(t *testing.T) { ) satelliteSys := planet.Satellites[0] - satelliteSys.Audit.Chore.Loop.Stop() + satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop() satelliteSys.Repair.Repairer.Loop.Stop() satelliteSys.Accounting.Tally.Loop.Pause() diff --git a/satellite/audit/audit_test.go b/satellite/audit/audit_test.go index b42d5f0e5..e20d61e51 100644 --- a/satellite/audit/audit_test.go +++ b/satellite/audit/audit_test.go @@ -25,7 +25,7 @@ import ( // specified bucket are counted correctly for storage node audit bandwidth // usage and the storage nodes will be paid for that. func TestAuditOrderLimit(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -79,7 +79,7 @@ func TestAuditOrderLimit(t *testing.T) { // Minimal test to verify that copies aren't audited. func TestAuditSkipsRemoteCopies(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -152,7 +152,7 @@ func TestAuditSkipsRemoteCopies(t *testing.T) { // Minimal test to verify that inline objects are not audited even if they are copies. func TestAuditSkipsInlineCopies(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] diff --git a/satellite/audit/chore.go b/satellite/audit/chore.go deleted file mode 100644 index 4c8f82ce7..000000000 --- a/satellite/audit/chore.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package audit - -import ( - "context" - "math/rand" - "time" - - "go.uber.org/zap" - - "storj.io/common/sync2" - "storj.io/common/uuid" - "storj.io/storj/satellite/metabase/segmentloop" -) - -// Chore populates reservoirs and the audit queue. -// -// architecture: Chore -type Chore struct { - log *zap.Logger - rand *rand.Rand - queue VerifyQueue - Loop *sync2.Cycle - - segmentLoop *segmentloop.Service - config Config -} - -// NewChore instantiates Chore. -func NewChore(log *zap.Logger, queue VerifyQueue, loop *segmentloop.Service, config Config) *Chore { - if config.VerificationPushBatchSize < 1 { - config.VerificationPushBatchSize = 1 - } - return &Chore{ - log: log, - rand: rand.New(rand.NewSource(time.Now().Unix())), - queue: queue, - Loop: sync2.NewCycle(config.ChoreInterval), - - segmentLoop: loop, - config: config, - } -} - -// Run starts the chore. -func (chore *Chore) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - return chore.Loop.Run(ctx, func(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - collector := NewCollector(chore.config.Slots, chore.rand) - err = chore.segmentLoop.Join(ctx, collector) - if err != nil { - chore.log.Error("error joining segmentloop", zap.Error(err)) - return nil - } - - type SegmentKey struct { - StreamID uuid.UUID - Position uint64 - } - - var newQueue []Segment - queueSegments := make(map[SegmentKey]struct{}) - - // Add reservoir segments to queue in pseudorandom order. - for i := 0; i < chore.config.Slots; i++ { - for _, res := range collector.Reservoirs { - segments := res.Segments() - // Skip reservoir if no segment at this index. - if len(segments) <= i { - continue - } - segment := segments[i] - segmentKey := SegmentKey{ - StreamID: segment.StreamID, - Position: segment.Position.Encode(), - } - if segmentKey == (SegmentKey{}) { - continue - } - - if _, ok := queueSegments[segmentKey]; !ok { - newQueue = append(newQueue, NewSegment(segment)) - queueSegments[segmentKey] = struct{}{} - } - } - } - - // Push new queue to queues struct so it can be fetched by worker. - return chore.queue.Push(ctx, newQueue, chore.config.VerificationPushBatchSize) - }) -} - -// Close closes chore. -func (chore *Chore) Close() error { - chore.Loop.Close() - return nil -} diff --git a/satellite/audit/collector.go b/satellite/audit/collector.go deleted file mode 100644 index 2c5614d2f..000000000 --- a/satellite/audit/collector.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package audit - -import ( - "context" - "math/rand" - - "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" -) - -var _ segmentloop.Observer = (*Collector)(nil) - -// Collector uses the segment loop to add segments to node reservoirs. -type Collector struct { - Reservoirs map[metabase.NodeAlias]*Reservoir - slotCount int - rand *rand.Rand -} - -// NewCollector instantiates a segment collector. -func NewCollector(reservoirSlots int, r *rand.Rand) *Collector { - return &Collector{ - Reservoirs: make(map[metabase.NodeAlias]*Reservoir), - slotCount: reservoirSlots, - rand: r, - } -} - -// LoopStarted is called at each start of a loop. -func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) { - return nil -} - -// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already. -func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { - // we are expliticy not adding monitoring here as we are tracking loop observers separately - - for _, piece := range segment.AliasPieces { - res, ok := collector.Reservoirs[piece.Alias] - if !ok { - res = NewReservoir(collector.slotCount) - collector.Reservoirs[piece.Alias] = res - } - res.Sample(collector.rand, segment) - } - return nil -} - -// InlineSegment returns nil because we're only auditing for storage nodes for now. -func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - return nil -} - -// Process performs per-node reservoir sampling on remote segments for addition into the audit queue. -func (collector *Collector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { - for _, segment := range segments { - // The reservoir ends up deferencing and copying the segment internally - // but that's not obvious, so alias the loop variable. - segment := segment - if segment.Inline() { - continue - } - if err := collector.RemoteSegment(ctx, &segment); err != nil { - return err - } - } - return nil -} diff --git a/satellite/audit/common_test.go b/satellite/audit/common_test.go index 17183702c..9e788e8cd 100644 --- a/satellite/audit/common_test.go +++ b/satellite/audit/common_test.go @@ -20,23 +20,7 @@ type runQueueingOnceFunc = func(ctx context.Context, satellite *testplanet.Satel // testWithChoreAndObserver runs an audit test for both the chore and observer. // It provides functions that the test can use to pause and run the queueing // done by the chore or observer. -func testWithChoreAndObserver(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) { - t.Run("Chore", func(t *testing.T) { - planetConfig := planetConfig - testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - t.Helper() - run(t, ctx, planet, - func(satellite *testplanet.Satellite) { - satellite.Audit.Chore.Loop.Pause() - }, - func(ctx context.Context, satellite *testplanet.Satellite) error { - satellite.Audit.Chore.Loop.TriggerWait() - return nil - }, - ) - }) - }) - +func testWithRangedLoop(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) { t.Run("Observer", func(t *testing.T) { planetConfig := planetConfig reconfigureSatellite := planetConfig.Reconfigure.Satellite diff --git a/satellite/audit/getshare_test.go b/satellite/audit/getshare_test.go index 4a296a802..a7e0cedf3 100644 --- a/satellite/audit/getshare_test.go +++ b/satellite/audit/getshare_test.go @@ -72,7 +72,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo } func TestGetShareDoesNameLookupIfNecessary(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { testSatellite := planet.Satellites[0] @@ -128,7 +128,7 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) { } func TestGetSharePrefers(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { testSatellite := planet.Satellites[0] diff --git a/satellite/audit/integration_test.go b/satellite/audit/integration_test.go index e01814e77..97de5b77e 100644 --- a/satellite/audit/integration_test.go +++ b/satellite/audit/integration_test.go @@ -20,7 +20,7 @@ import ( ) func TestChoreAndWorkerIntegration(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { diff --git a/satellite/audit/observer.go b/satellite/audit/observer.go index e81f5c2bc..02b109005 100644 --- a/satellite/audit/observer.go +++ b/satellite/audit/observer.go @@ -14,6 +14,7 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/segmentloop" ) // Observer populates reservoirs and the audit queue. @@ -26,9 +27,12 @@ type Observer struct { seedRand *rand.Rand // The follow fields are reset on each segment loop cycle. - reservoirs map[metabase.NodeAlias]*Reservoir + Reservoirs map[metabase.NodeAlias]*Reservoir } +var _ rangedloop.Observer = (*Observer)(nil) +var _ rangedloop.Partial = (*observerFork)(nil) + // NewObserver instantiates Observer. func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer { if config.VerificationPushBatchSize < 1 { @@ -46,7 +50,7 @@ func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer { func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) { defer mon.Task()(&ctx)(&err) - obs.reservoirs = make(map[metabase.NodeAlias]*Reservoir) + obs.Reservoirs = make(map[metabase.NodeAlias]*Reservoir) return nil } @@ -60,22 +64,22 @@ func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) // for two or more RNGs. To prevent that, the observer itself uses an RNG // to seed the per-collector RNGs. rnd := rand.New(rand.NewSource(obs.seedRand.Int63())) - return NewCollector(obs.config.Slots, rnd), nil + return newObserverFork(obs.config.Slots, rnd), nil } // Join merges the audit reservoir collector into the per-node reservoirs. func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) { defer mon.Task()(&ctx)(&err) - collector, ok := partial.(*Collector) + fork, ok := partial.(*observerFork) if !ok { - return errs.New("expected partial type %T but got %T", collector, partial) + return errs.New("expected partial type %T but got %T", fork, partial) } - for nodeAlias, reservoir := range collector.Reservoirs { - existing, ok := obs.reservoirs[nodeAlias] + for nodeAlias, reservoir := range fork.reservoirs { + existing, ok := obs.Reservoirs[nodeAlias] if !ok { - obs.reservoirs[nodeAlias] = reservoir + obs.Reservoirs[nodeAlias] = reservoir continue } if err := existing.Merge(reservoir); err != nil { @@ -99,7 +103,7 @@ func (obs *Observer) Finish(ctx context.Context) (err error) { // Add reservoir segments to queue in pseudorandom order. for i := 0; i < obs.config.Slots; i++ { - for _, res := range obs.reservoirs { + for _, res := range obs.Reservoirs { segments := res.Segments() // Skip reservoir if no segment at this index. if len(segments) <= i { @@ -120,3 +124,39 @@ func (obs *Observer) Finish(ctx context.Context) (err error) { // Push new queue to queues struct so it can be fetched by worker. return obs.queue.Push(ctx, newQueue, obs.config.VerificationPushBatchSize) } + +type observerFork struct { + reservoirs map[metabase.NodeAlias]*Reservoir + slotCount int + rand *rand.Rand +} + +func newObserverFork(reservoirSlots int, r *rand.Rand) *observerFork { + return &observerFork{ + reservoirs: make(map[metabase.NodeAlias]*Reservoir), + slotCount: reservoirSlots, + rand: r, + } +} + +// Process performs per-node reservoir sampling on remote segments for addition into the audit queue. +func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { + for _, segment := range segments { + // The reservoir ends up deferencing and copying the segment internally + // but that's not obvious, so alias the loop variable. + segment := segment + if segment.Inline() { + continue + } + + for _, piece := range segment.AliasPieces { + res, ok := fork.reservoirs[piece.Alias] + if !ok { + res = NewReservoir(fork.slotCount) + fork.reservoirs[piece.Alias] = res + } + res.Sample(fork.rand, segment) + } + } + return nil +} diff --git a/satellite/audit/collector_test.go b/satellite/audit/observer_test.go similarity index 81% rename from satellite/audit/collector_test.go rename to satellite/audit/observer_test.go index fcd4161c9..154ebfe49 100644 --- a/satellite/audit/collector_test.go +++ b/satellite/audit/observer_test.go @@ -4,19 +4,20 @@ package audit_test import ( - "math/rand" "strconv" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/segmentloop" ) @@ -38,7 +39,7 @@ func TestAuditCollector(t *testing.T) { }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] - satellite.Audit.Worker.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() ul := planet.Uplinks[0] @@ -50,9 +51,11 @@ func TestAuditCollector(t *testing.T) { require.NoError(t, err) } - r := rand.New(rand.NewSource(time.Now().Unix())) - observer := audit.NewCollector(4, r) - err := satellite.Metabase.SegmentLoop.Join(ctx, observer) + observer := audit.NewObserver(zaptest.NewLogger(t), satellite.Audit.VerifyQueue, satellite.Config.Audit) + + ranges := rangedloop.NewMetabaseRangeSplitter(satellite.Metabase.DB, 0, 100) + loop := rangedloop.NewService(zaptest.NewLogger(t), satellite.Config.RangedLoop, ranges, []rangedloop.Observer{observer}) + _, err := loop.RunOnce(ctx) require.NoError(t, err) aliases, err := planet.Satellites[0].Metabase.DB.LatestNodesAliasMap(ctx) @@ -90,15 +93,15 @@ func BenchmarkRemoteSegment(b *testing.B) { require.NoError(b, err) } - observer := audit.NewCollector(3, rand.New(rand.NewSource(time.Now().Unix()))) + observer := audit.NewObserver(zap.NewNop(), nil, planet.Satellites[0].Config.Audit) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) - loopSegments := []*segmentloop.Segment{} + loopSegments := []segmentloop.Segment{} for _, segment := range segments { - loopSegments = append(loopSegments, &segmentloop.Segment{ + loopSegments = append(loopSegments, segmentloop.Segment{ StreamID: segment.StreamID, Position: segment.Position, CreatedAt: segment.CreatedAt, @@ -108,14 +111,12 @@ func BenchmarkRemoteSegment(b *testing.B) { }) } + fork, err := observer.Fork(ctx) + require.NoError(b, err) + b.Run("multiple segments", func(b *testing.B) { for i := 0; i < b.N; i++ { - for _, loopSegment := range loopSegments { - err := observer.RemoteSegment(ctx, loopSegment) - if err != nil { - b.FailNow() - } - } + _ = fork.Process(ctx, loopSegments) } }) }) diff --git a/satellite/audit/reservoir.go b/satellite/audit/reservoir.go index 042f23f7c..000b0f220 100644 --- a/satellite/audit/reservoir.go +++ b/satellite/audit/reservoir.go @@ -55,14 +55,14 @@ func (reservoir *Reservoir) Keys() []float64 { // be passed in. The way this is accomplished is known as _Reservoir Sampling_. // The specific algorithm we are using here is called A-Res on the Wikipedia // article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res -func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) { +func (reservoir *Reservoir) Sample(r *rand.Rand, segment segmentloop.Segment) { k := -math.Log(r.Float64()) / float64(segment.EncryptedSize) reservoir.sample(k, segment) } -func (reservoir *Reservoir) sample(k float64, segment *segmentloop.Segment) { +func (reservoir *Reservoir) sample(k float64, segment segmentloop.Segment) { if reservoir.index < reservoir.size { - reservoir.segments[reservoir.index] = *segment + reservoir.segments[reservoir.index] = segment reservoir.keys[reservoir.index] = k reservoir.index++ } else { @@ -73,7 +73,7 @@ func (reservoir *Reservoir) sample(k float64, segment *segmentloop.Segment) { } } if k < reservoir.keys[max] { - reservoir.segments[max] = *segment + reservoir.segments[max] = segment reservoir.keys[max] = k } } @@ -85,7 +85,7 @@ func (reservoir *Reservoir) Merge(operand *Reservoir) error { return errs.New("cannot merge: mismatched size: expected %d but got %d", reservoir.size, operand.size) } for i := int8(0); i < operand.index; i++ { - reservoir.sample(operand.keys[i], &operand.segments[i]) + reservoir.sample(operand.keys[i], operand.segments[i]) } return nil } diff --git a/satellite/audit/reservoir_test.go b/satellite/audit/reservoir_test.go index 7e6967fef..305185958 100644 --- a/satellite/audit/reservoir_test.go +++ b/satellite/audit/reservoir_test.go @@ -33,7 +33,7 @@ func TestReservoir(t *testing.T) { // If we sample N segments, less than the max, we should record all N r := NewReservoir(size) for _, sample := range samples { - r.Sample(rng, &sample) + r.Sample(rng, sample) } require.Equal(t, samples, r.Segments()) require.Len(t, r.Keys(), len(samples)) @@ -50,14 +50,14 @@ func TestReservoirMerge(t *testing.T) { } rng := rand.New(rand.NewSource(999)) r1 := NewReservoir(3) - r1.Sample(rng, &segments[0]) - r1.Sample(rng, &segments[1]) - r1.Sample(rng, &segments[2]) + r1.Sample(rng, segments[0]) + r1.Sample(rng, segments[1]) + r1.Sample(rng, segments[2]) r2 := NewReservoir(3) - r2.Sample(rng, &segments[3]) - r2.Sample(rng, &segments[4]) - r2.Sample(rng, &segments[5]) + r2.Sample(rng, segments[3]) + r2.Sample(rng, segments[4]) + r2.Sample(rng, segments[5]) err := r1.Merge(r2) require.NoError(t, err) @@ -93,7 +93,7 @@ func TestReservoirWeights(t *testing.T) { weight1StreamID: 0, } - segments := []*segmentloop.Segment{ + segments := []segmentloop.Segment{ { StreamID: weight10StreamID, Position: metabase.SegmentPosition{}, @@ -167,7 +167,7 @@ func TestReservoirBias(t *testing.T) { EncryptedSize: weight, } binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits)) - res.Sample(rng, &seg) + res.Sample(rng, seg) } for i, seg := range res.Segments() { num := binary.BigEndian.Uint64(seg.StreamID[0:8]) >> (64 - useBits) diff --git a/satellite/audit/reverifier_test.go b/satellite/audit/reverifier_test.go index 5e19534e6..312e4d52f 100644 --- a/satellite/audit/reverifier_test.go +++ b/satellite/audit/reverifier_test.go @@ -32,7 +32,7 @@ func TestReverifyPiece(t *testing.T) { audits := satellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() segment := uploadSomeData(t, ctx, planet) @@ -62,7 +62,7 @@ func TestReverifyPieceSucceeds(t *testing.T) { audits := satellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() segment := uploadSomeData(t, ctx, planet) @@ -92,7 +92,7 @@ func TestReverifyPieceWithNodeOffline(t *testing.T) { audits := satellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() segment := uploadSomeData(t, ctx, planet) @@ -125,7 +125,7 @@ func TestReverifyPieceWithPieceMissing(t *testing.T) { audits := satellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() segment := uploadSomeData(t, ctx, planet) @@ -161,7 +161,7 @@ func testReverifyRewrittenPiece(t *testing.T, mutator func(content []byte, heade audits := satellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() segment := uploadSomeData(t, ctx, planet) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 9b4dad61f..87311d4f3 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -28,7 +28,7 @@ import ( ) func TestReverifySuccess(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { // This is a bulky test but all it's doing is: @@ -90,7 +90,7 @@ func TestReverifySuccess(t *testing.T) { } func TestReverifyFailMissingShare(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { // - uploads random data @@ -160,7 +160,7 @@ func TestReverifyFailMissingShare(t *testing.T) { } func TestReverifyOffline(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { // - uploads random data @@ -226,7 +226,7 @@ func TestReverifyOffline(t *testing.T) { } func TestReverifyOfflineDialTimeout(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { // - uploads random data @@ -320,7 +320,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { } func TestReverifyDeletedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.ReconfigureRS(1, 2, 4, 4), @@ -401,7 +401,7 @@ func cloneAndDropPiece(ctx context.Context, metabaseDB *metabase.DB, segment *me } func TestReverifyModifiedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.ReconfigureRS(1, 2, 4, 4), @@ -479,7 +479,7 @@ func TestReverifyModifiedSegment(t *testing.T) { } func TestReverifyReplacedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.ReconfigureRS(1, 2, 4, 4), @@ -548,7 +548,7 @@ func TestReverifyReplacedSegment(t *testing.T) { // TestReverifyExpired tests the case where the segment passed into Reverify is expired. func TestReverifyExpired(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -609,7 +609,7 @@ func TestReverifyExpired(t *testing.T) { // audit service gets put into containment mode. func TestReverifySlowDownload(t *testing.T) { const auditTimeout = time.Second - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -684,7 +684,7 @@ func TestReverifySlowDownload(t *testing.T) { // TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained. func TestReverifyUnknownError(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -750,7 +750,7 @@ func TestReverifyUnknownError(t *testing.T) { func TestMaxReverifyCount(t *testing.T) { const auditTimeout = time.Second - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -851,7 +851,7 @@ func TestTimeDelayBeforeReverifies(t *testing.T) { auditTimeout = time.Second reverifyInterval = time.Second / 4 ) - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index e0039dbcc..ddef48fc1 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -39,7 +39,7 @@ import ( // returned by the DownloadShares method contain no error if all shares were // downloaded successfully. func TestDownloadSharesHappyPath(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -94,7 +94,7 @@ func TestDownloadSharesHappyPath(t *testing.T) { // If this test fails, this most probably means we made a backward-incompatible // change that affects the audit service. func TestDownloadSharesOfflineNode(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -157,7 +157,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) { // If this test fails, this most probably means we made a backward-incompatible // change that affects the audit service. func TestDownloadSharesMissingPiece(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -216,7 +216,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) { // If this test fails, this most probably means we made a backward-incompatible // change that affects the audit service. func TestDownloadSharesDialTimeout(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -297,7 +297,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) { // If this test fails, this most probably means we made a backward-incompatible // change that affects the audit service. func TestDownloadSharesDownloadTimeout(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -370,7 +370,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { } func TestVerifierHappyPath(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -409,7 +409,7 @@ func TestVerifierHappyPath(t *testing.T) { } func TestVerifierExpired(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -448,7 +448,7 @@ func TestVerifierExpired(t *testing.T) { } func TestVerifierOfflineNode(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { @@ -493,7 +493,7 @@ func TestVerifierOfflineNode(t *testing.T) { } func TestVerifierMissingPiece(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -540,7 +540,7 @@ func TestVerifierMissingPiece(t *testing.T) { } func TestVerifierNotEnoughPieces(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -609,7 +609,7 @@ func TestVerifierNotEnoughPieces(t *testing.T) { } func TestVerifierDialTimeout(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -674,7 +674,7 @@ func TestVerifierDialTimeout(t *testing.T) { } func TestVerifierDeletedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -712,7 +712,7 @@ func TestVerifierDeletedSegment(t *testing.T) { } func TestVerifierModifiedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -765,7 +765,7 @@ func TestVerifierModifiedSegment(t *testing.T) { } func TestVerifierReplacedSegment(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -805,7 +805,7 @@ func TestVerifierReplacedSegment(t *testing.T) { } func TestVerifierModifiedSegmentFailsOnce(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -855,7 +855,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) { // TestVerifierSlowDownload checks that a node that times out while sending data to the // audit service gets put into containment mode. func TestVerifierSlowDownload(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -916,7 +916,7 @@ func TestVerifierSlowDownload(t *testing.T) { // TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request // does not get marked as successful, failed, or contained. func TestVerifierUnknownError(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { @@ -1144,7 +1144,7 @@ func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Plan } func TestIdentifyContainedNodes(t *testing.T) { - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { satellite := planet.Satellites[0] @@ -1198,7 +1198,7 @@ func TestConcurrentAuditsSuccess(t *testing.T) { minPieces = 5 ) - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ // every segment gets a piece on every node, so that every segment audit @@ -1273,7 +1273,7 @@ func TestConcurrentAuditsUnknownError(t *testing.T) { badNodes = minPieces / 2 ) - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ // every segment gets a piece on every node, so that every segment audit @@ -1356,7 +1356,7 @@ func TestConcurrentAuditsFailure(t *testing.T) { badNodes = minPieces / 2 ) - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ // every segment gets a piece on every node, so that every segment audit @@ -1443,7 +1443,7 @@ func TestConcurrentAuditsTimeout(t *testing.T) { retryInterval = 5 * time.Minute ) - testWithChoreAndObserver(t, testplanet.Config{ + testWithRangedLoop(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ // every segment should get a piece on every node, so that every segment audit diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index 9cedf4ffc..f4ea48d1f 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -30,7 +30,7 @@ type Config struct { Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` VerificationPushBatchSize int `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"` WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` - UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"` + UseRangedLoop bool `help:"whether use Audit observer with ranged loop." default:"true"` ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"` ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"` diff --git a/satellite/core.go b/satellite/core.go index 53a8bef46..76aa3974a 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -110,7 +110,6 @@ type Core struct { Audit struct { VerifyQueue audit.VerifyQueue ReverifyQueue audit.ReverifyQueue - Chore *audit.Chore ContainmentSyncChore *audit.ContainmentSyncChore } @@ -368,34 +367,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Audit.VerifyQueue = db.VerifyQueue() peer.Audit.ReverifyQueue = db.ReverifyQueue() - if config.UseRangedLoop { - peer.Log.Named("audit:chore").Info("using ranged loop") - } else { - peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"), - peer.Audit.VerifyQueue, - peer.Metainfo.SegmentLoop, - config, - ) - peer.Services.Add(lifecycle.Item{ - Name: "audit:chore", - Run: peer.Audit.Chore.Run, - Close: peer.Audit.Chore.Close, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Audit Chore", peer.Audit.Chore.Loop)) - - peer.Audit.ContainmentSyncChore = audit.NewContainmentSyncChore(peer.Log.Named("audit:containment-sync-chore"), - peer.Audit.ReverifyQueue, - peer.Overlay.DB, - config.ContainmentSyncChoreInterval, - ) - peer.Services.Add(lifecycle.Item{ - Name: "audit:containment-sync-chore", - Run: peer.Audit.ContainmentSyncChore.Run, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Audit Containment Sync Chore", peer.Audit.ContainmentSyncChore.Loop)) - } + peer.Audit.ContainmentSyncChore = audit.NewContainmentSyncChore(peer.Log.Named("audit:containment-sync-chore"), + peer.Audit.ReverifyQueue, + peer.Overlay.DB, + config.ContainmentSyncChoreInterval, + ) + peer.Services.Add(lifecycle.Item{ + Name: "audit:containment-sync-chore", + Run: peer.Audit.ContainmentSyncChore.Run, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Audit Containment Sync Chore", peer.Audit.ContainmentSyncChore.Loop)) } { // setup expired segment cleanup diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 951a5d2e9..64c13840b 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -171,7 +171,7 @@ func TestEnsureMinimumRequested(t *testing.T) { satellite := planet.Satellites[0] // pause chores that might update node data - satellite.Audit.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() satellite.Repair.Checker.Loop.Pause() satellite.Repair.Repairer.Loop.Pause() for _, node := range planet.StorageNodes { diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index e68a52200..6d59caf6e 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -392,7 +392,7 @@ func TestGetOnlineNodesForGetDelete(t *testing.T) { SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // pause chores that might update node data - planet.Satellites[0].Audit.Chore.Loop.Pause() + planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Stop() planet.Satellites[0].Repair.Checker.Loop.Pause() planet.Satellites[0].Repair.Repairer.Loop.Pause() for _, node := range planet.StorageNodes { diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 58a164101..229a2dacd 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -1323,7 +1323,7 @@ func TestRepairExpiredSegment(t *testing.T) { satellite := planet.Satellites[0] // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Stop() - satellite.Audit.Chore.Loop.Pause() + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() satellite.Repair.Checker.Loop.Pause() satellite.Repair.Repairer.Loop.Pause() @@ -1361,9 +1361,6 @@ func TestRepairExpiredSegment(t *testing.T) { satellite.Repair.Checker.Loop.TriggerWait() satellite.Repair.Checker.Loop.Pause() - // get encrypted path of segment with audit service - satellite.Audit.Chore.Loop.TriggerWait() - // Verify that the segment is on the repair queue count, err := satellite.DB.RepairQueue().Count(ctx) require.NoError(t, err) @@ -2873,7 +2870,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { audits := testSatellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + testSatellite.RangedLoop.RangedLoop.Service.Loop.Stop() ul := planet.Uplinks[0] testData := testrand.Bytes(8 * memory.KiB) @@ -2881,7 +2878,10 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData) require.NoError(t, err) - audits.Chore.Loop.TriggerWait() + // trigger audit + _, err = testSatellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + queue := audits.VerifyQueue queueSegment, err := queue.Next(ctx) require.NoError(t, err) @@ -2945,7 +2945,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { audits := testSatellite.Audit audits.Worker.Loop.Pause() - audits.Chore.Loop.Pause() + testSatellite.RangedLoop.RangedLoop.Service.Loop.Stop() ul := planet.Uplinks[0] testData := testrand.Bytes(8 * memory.KiB) @@ -2953,7 +2953,10 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData) require.NoError(t, err) - audits.Chore.Loop.TriggerWait() + // trigger audit + _, err = testSatellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + queue := audits.VerifyQueue queueSegment, err := queue.Next(ctx) require.NoError(t, err) diff --git a/satellite/reputation/service_test.go b/satellite/reputation/service_test.go index ed29d3b8b..93c6b5df6 100644 --- a/satellite/reputation/service_test.go +++ b/satellite/reputation/service_test.go @@ -25,7 +25,8 @@ func TestConcurrentAudit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - planet.Satellites[0].Audit.Chore.Loop.Stop() + planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Stop() + data := testrand.Bytes(10 * memory.MB) err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "testpath", data) require.NoError(t, err) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index c8a55e6ba..c8f006894 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -79,8 +79,8 @@ # number of reservoir slots allotted for nodes, currently capped at 3 # audit.slots: 3 -# whether or not to use the ranged loop observer instead of the chore. -# audit.use-ranged-loop: false +# whether use Audit observer with ranged loop. +# audit.use-ranged-loop: true # number of audit jobs to push at once to the verification queue # audit.verification-push-batch-size: 4096