satellite/audit: remove segments loop parts
We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I9cec0ac454f40f19d52c078a8b1870c4d192bd7a
This commit is contained in:
parent
6ac5bf0d7c
commit
1aa24b9f0d
@ -143,7 +143,6 @@ type Satellite struct {
|
||||
ReverifyQueue audit.ReverifyQueue
|
||||
Worker *audit.Worker
|
||||
ReverifyWorker *audit.ReverifyWorker
|
||||
Chore *audit.Chore
|
||||
Verifier *audit.Verifier
|
||||
Reverifier *audit.Reverifier
|
||||
Reporter audit.Reporter
|
||||
@ -637,7 +636,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
||||
system.Audit.ReverifyQueue = auditorPeer.Audit.ReverifyQueue
|
||||
system.Audit.Worker = auditorPeer.Audit.Worker
|
||||
system.Audit.ReverifyWorker = auditorPeer.Audit.ReverifyWorker
|
||||
system.Audit.Chore = peer.Audit.Chore
|
||||
system.Audit.Verifier = auditorPeer.Audit.Verifier
|
||||
system.Audit.Reverifier = auditorPeer.Audit.Reverifier
|
||||
system.Audit.Reporter = auditorPeer.Audit.Reporter
|
||||
|
@ -298,7 +298,7 @@ func TestBilling_UploadNoEgress(t *testing.T) {
|
||||
// Make sure that we don't have interference with billed repair traffic
|
||||
// in case of a bug. There is a specific test to verify that the repair
|
||||
// traffic isn't billed.
|
||||
satelliteSys.Audit.Chore.Loop.Stop()
|
||||
satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
satelliteSys.Repair.Repairer.Loop.Stop()
|
||||
// stop any async flushes because we want to be sure when some values are
|
||||
// written to avoid races
|
||||
@ -340,7 +340,7 @@ func TestBilling_DownloadTraffic(t *testing.T) {
|
||||
// Make sure that we don't have interference with billed repair traffic
|
||||
// in case of a bug. There is a specific test to verify that the repair
|
||||
// traffic isn't billed.
|
||||
satelliteSys.Audit.Chore.Loop.Stop()
|
||||
satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
satelliteSys.Repair.Repairer.Loop.Stop()
|
||||
// stop any async flushes because we want to be sure when some values are
|
||||
// written to avoid races
|
||||
@ -376,7 +376,7 @@ func TestBilling_ExpiredFiles(t *testing.T) {
|
||||
)
|
||||
|
||||
satelliteSys := planet.Satellites[0]
|
||||
satelliteSys.Audit.Chore.Loop.Stop()
|
||||
satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
satelliteSys.Repair.Repairer.Loop.Stop()
|
||||
|
||||
satelliteSys.Accounting.Tally.Loop.Pause()
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
// specified bucket are counted correctly for storage node audit bandwidth
|
||||
// usage and the storage nodes will be paid for that.
|
||||
func TestAuditOrderLimit(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -79,7 +79,7 @@ func TestAuditOrderLimit(t *testing.T) {
|
||||
|
||||
// Minimal test to verify that copies aren't audited.
|
||||
func TestAuditSkipsRemoteCopies(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -152,7 +152,7 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
|
||||
|
||||
// Minimal test to verify that inline objects are not audited even if they are copies.
|
||||
func TestAuditSkipsInlineCopies(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
|
@ -1,101 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// Chore populates reservoirs and the audit queue.
|
||||
//
|
||||
// architecture: Chore
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
rand *rand.Rand
|
||||
queue VerifyQueue
|
||||
Loop *sync2.Cycle
|
||||
|
||||
segmentLoop *segmentloop.Service
|
||||
config Config
|
||||
}
|
||||
|
||||
// NewChore instantiates Chore.
|
||||
func NewChore(log *zap.Logger, queue VerifyQueue, loop *segmentloop.Service, config Config) *Chore {
|
||||
if config.VerificationPushBatchSize < 1 {
|
||||
config.VerificationPushBatchSize = 1
|
||||
}
|
||||
return &Chore{
|
||||
log: log,
|
||||
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
queue: queue,
|
||||
Loop: sync2.NewCycle(config.ChoreInterval),
|
||||
|
||||
segmentLoop: loop,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the chore.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
collector := NewCollector(chore.config.Slots, chore.rand)
|
||||
err = chore.segmentLoop.Join(ctx, collector)
|
||||
if err != nil {
|
||||
chore.log.Error("error joining segmentloop", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
type SegmentKey struct {
|
||||
StreamID uuid.UUID
|
||||
Position uint64
|
||||
}
|
||||
|
||||
var newQueue []Segment
|
||||
queueSegments := make(map[SegmentKey]struct{})
|
||||
|
||||
// Add reservoir segments to queue in pseudorandom order.
|
||||
for i := 0; i < chore.config.Slots; i++ {
|
||||
for _, res := range collector.Reservoirs {
|
||||
segments := res.Segments()
|
||||
// Skip reservoir if no segment at this index.
|
||||
if len(segments) <= i {
|
||||
continue
|
||||
}
|
||||
segment := segments[i]
|
||||
segmentKey := SegmentKey{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position.Encode(),
|
||||
}
|
||||
if segmentKey == (SegmentKey{}) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := queueSegments[segmentKey]; !ok {
|
||||
newQueue = append(newQueue, NewSegment(segment))
|
||||
queueSegments[segmentKey] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push new queue to queues struct so it can be fetched by worker.
|
||||
return chore.queue.Push(ctx, newQueue, chore.config.VerificationPushBatchSize)
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes chore.
|
||||
func (chore *Chore) Close() error {
|
||||
chore.Loop.Close()
|
||||
return nil
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ segmentloop.Observer = (*Collector)(nil)
|
||||
|
||||
// Collector uses the segment loop to add segments to node reservoirs.
|
||||
type Collector struct {
|
||||
Reservoirs map[metabase.NodeAlias]*Reservoir
|
||||
slotCount int
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
// NewCollector instantiates a segment collector.
|
||||
func NewCollector(reservoirSlots int, r *rand.Rand) *Collector {
|
||||
return &Collector{
|
||||
Reservoirs: make(map[metabase.NodeAlias]*Reservoir),
|
||||
slotCount: reservoirSlots,
|
||||
rand: r,
|
||||
}
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
|
||||
func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
||||
// we are expliticy not adding monitoring here as we are tracking loop observers separately
|
||||
|
||||
for _, piece := range segment.AliasPieces {
|
||||
res, ok := collector.Reservoirs[piece.Alias]
|
||||
if !ok {
|
||||
res = NewReservoir(collector.slotCount)
|
||||
collector.Reservoirs[piece.Alias] = res
|
||||
}
|
||||
res.Sample(collector.rand, segment)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineSegment returns nil because we're only auditing for storage nodes for now.
|
||||
func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
|
||||
func (collector *Collector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
for _, segment := range segments {
|
||||
// The reservoir ends up deferencing and copying the segment internally
|
||||
// but that's not obvious, so alias the loop variable.
|
||||
segment := segment
|
||||
if segment.Inline() {
|
||||
continue
|
||||
}
|
||||
if err := collector.RemoteSegment(ctx, &segment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -20,23 +20,7 @@ type runQueueingOnceFunc = func(ctx context.Context, satellite *testplanet.Satel
|
||||
// testWithChoreAndObserver runs an audit test for both the chore and observer.
|
||||
// It provides functions that the test can use to pause and run the queueing
|
||||
// done by the chore or observer.
|
||||
func testWithChoreAndObserver(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) {
|
||||
t.Run("Chore", func(t *testing.T) {
|
||||
planetConfig := planetConfig
|
||||
testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
t.Helper()
|
||||
run(t, ctx, planet,
|
||||
func(satellite *testplanet.Satellite) {
|
||||
satellite.Audit.Chore.Loop.Pause()
|
||||
},
|
||||
func(ctx context.Context, satellite *testplanet.Satellite) error {
|
||||
satellite.Audit.Chore.Loop.TriggerWait()
|
||||
return nil
|
||||
},
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
func testWithRangedLoop(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) {
|
||||
t.Run("Observer", func(t *testing.T) {
|
||||
planetConfig := planetConfig
|
||||
reconfigureSatellite := planetConfig.Reconfigure.Satellite
|
||||
|
@ -72,7 +72,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo
|
||||
}
|
||||
|
||||
func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
testSatellite := planet.Satellites[0]
|
||||
@ -128,7 +128,7 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetSharePrefers(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
testSatellite := planet.Satellites[0]
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
)
|
||||
|
||||
func TestChoreAndWorkerIntegration(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
// Observer populates reservoirs and the audit queue.
|
||||
@ -26,9 +27,12 @@ type Observer struct {
|
||||
seedRand *rand.Rand
|
||||
|
||||
// The follow fields are reset on each segment loop cycle.
|
||||
reservoirs map[metabase.NodeAlias]*Reservoir
|
||||
Reservoirs map[metabase.NodeAlias]*Reservoir
|
||||
}
|
||||
|
||||
var _ rangedloop.Observer = (*Observer)(nil)
|
||||
var _ rangedloop.Partial = (*observerFork)(nil)
|
||||
|
||||
// NewObserver instantiates Observer.
|
||||
func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer {
|
||||
if config.VerificationPushBatchSize < 1 {
|
||||
@ -46,7 +50,7 @@ func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer {
|
||||
func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
obs.reservoirs = make(map[metabase.NodeAlias]*Reservoir)
|
||||
obs.Reservoirs = make(map[metabase.NodeAlias]*Reservoir)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -60,22 +64,22 @@ func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error)
|
||||
// for two or more RNGs. To prevent that, the observer itself uses an RNG
|
||||
// to seed the per-collector RNGs.
|
||||
rnd := rand.New(rand.NewSource(obs.seedRand.Int63()))
|
||||
return NewCollector(obs.config.Slots, rnd), nil
|
||||
return newObserverFork(obs.config.Slots, rnd), nil
|
||||
}
|
||||
|
||||
// Join merges the audit reservoir collector into the per-node reservoirs.
|
||||
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
collector, ok := partial.(*Collector)
|
||||
fork, ok := partial.(*observerFork)
|
||||
if !ok {
|
||||
return errs.New("expected partial type %T but got %T", collector, partial)
|
||||
return errs.New("expected partial type %T but got %T", fork, partial)
|
||||
}
|
||||
|
||||
for nodeAlias, reservoir := range collector.Reservoirs {
|
||||
existing, ok := obs.reservoirs[nodeAlias]
|
||||
for nodeAlias, reservoir := range fork.reservoirs {
|
||||
existing, ok := obs.Reservoirs[nodeAlias]
|
||||
if !ok {
|
||||
obs.reservoirs[nodeAlias] = reservoir
|
||||
obs.Reservoirs[nodeAlias] = reservoir
|
||||
continue
|
||||
}
|
||||
if err := existing.Merge(reservoir); err != nil {
|
||||
@ -99,7 +103,7 @@ func (obs *Observer) Finish(ctx context.Context) (err error) {
|
||||
|
||||
// Add reservoir segments to queue in pseudorandom order.
|
||||
for i := 0; i < obs.config.Slots; i++ {
|
||||
for _, res := range obs.reservoirs {
|
||||
for _, res := range obs.Reservoirs {
|
||||
segments := res.Segments()
|
||||
// Skip reservoir if no segment at this index.
|
||||
if len(segments) <= i {
|
||||
@ -120,3 +124,39 @@ func (obs *Observer) Finish(ctx context.Context) (err error) {
|
||||
// Push new queue to queues struct so it can be fetched by worker.
|
||||
return obs.queue.Push(ctx, newQueue, obs.config.VerificationPushBatchSize)
|
||||
}
|
||||
|
||||
type observerFork struct {
|
||||
reservoirs map[metabase.NodeAlias]*Reservoir
|
||||
slotCount int
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
func newObserverFork(reservoirSlots int, r *rand.Rand) *observerFork {
|
||||
return &observerFork{
|
||||
reservoirs: make(map[metabase.NodeAlias]*Reservoir),
|
||||
slotCount: reservoirSlots,
|
||||
rand: r,
|
||||
}
|
||||
}
|
||||
|
||||
// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
for _, segment := range segments {
|
||||
// The reservoir ends up deferencing and copying the segment internally
|
||||
// but that's not obvious, so alias the loop variable.
|
||||
segment := segment
|
||||
if segment.Inline() {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, piece := range segment.AliasPieces {
|
||||
res, ok := fork.reservoirs[piece.Alias]
|
||||
if !ok {
|
||||
res = NewReservoir(fork.slotCount)
|
||||
fork.reservoirs[piece.Alias] = res
|
||||
}
|
||||
res.Sample(fork.rand, segment)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -4,19 +4,20 @@
|
||||
package audit_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
@ -38,7 +39,7 @@ func TestAuditCollector(t *testing.T) {
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
|
||||
@ -50,9 +51,11 @@ func TestAuditCollector(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
observer := audit.NewCollector(4, r)
|
||||
err := satellite.Metabase.SegmentLoop.Join(ctx, observer)
|
||||
observer := audit.NewObserver(zaptest.NewLogger(t), satellite.Audit.VerifyQueue, satellite.Config.Audit)
|
||||
|
||||
ranges := rangedloop.NewMetabaseRangeSplitter(satellite.Metabase.DB, 0, 100)
|
||||
loop := rangedloop.NewService(zaptest.NewLogger(t), satellite.Config.RangedLoop, ranges, []rangedloop.Observer{observer})
|
||||
_, err := loop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
aliases, err := planet.Satellites[0].Metabase.DB.LatestNodesAliasMap(ctx)
|
||||
@ -90,15 +93,15 @@ func BenchmarkRemoteSegment(b *testing.B) {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
observer := audit.NewCollector(3, rand.New(rand.NewSource(time.Now().Unix())))
|
||||
observer := audit.NewObserver(zap.NewNop(), nil, planet.Satellites[0].Config.Audit)
|
||||
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(b, err)
|
||||
|
||||
loopSegments := []*segmentloop.Segment{}
|
||||
loopSegments := []segmentloop.Segment{}
|
||||
|
||||
for _, segment := range segments {
|
||||
loopSegments = append(loopSegments, &segmentloop.Segment{
|
||||
loopSegments = append(loopSegments, segmentloop.Segment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
CreatedAt: segment.CreatedAt,
|
||||
@ -108,14 +111,12 @@ func BenchmarkRemoteSegment(b *testing.B) {
|
||||
})
|
||||
}
|
||||
|
||||
fork, err := observer.Fork(ctx)
|
||||
require.NoError(b, err)
|
||||
|
||||
b.Run("multiple segments", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, loopSegment := range loopSegments {
|
||||
err := observer.RemoteSegment(ctx, loopSegment)
|
||||
if err != nil {
|
||||
b.FailNow()
|
||||
}
|
||||
}
|
||||
_ = fork.Process(ctx, loopSegments)
|
||||
}
|
||||
})
|
||||
})
|
@ -55,14 +55,14 @@ func (reservoir *Reservoir) Keys() []float64 {
|
||||
// be passed in. The way this is accomplished is known as _Reservoir Sampling_.
|
||||
// The specific algorithm we are using here is called A-Res on the Wikipedia
|
||||
// article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res
|
||||
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) {
|
||||
func (reservoir *Reservoir) Sample(r *rand.Rand, segment segmentloop.Segment) {
|
||||
k := -math.Log(r.Float64()) / float64(segment.EncryptedSize)
|
||||
reservoir.sample(k, segment)
|
||||
}
|
||||
|
||||
func (reservoir *Reservoir) sample(k float64, segment *segmentloop.Segment) {
|
||||
func (reservoir *Reservoir) sample(k float64, segment segmentloop.Segment) {
|
||||
if reservoir.index < reservoir.size {
|
||||
reservoir.segments[reservoir.index] = *segment
|
||||
reservoir.segments[reservoir.index] = segment
|
||||
reservoir.keys[reservoir.index] = k
|
||||
reservoir.index++
|
||||
} else {
|
||||
@ -73,7 +73,7 @@ func (reservoir *Reservoir) sample(k float64, segment *segmentloop.Segment) {
|
||||
}
|
||||
}
|
||||
if k < reservoir.keys[max] {
|
||||
reservoir.segments[max] = *segment
|
||||
reservoir.segments[max] = segment
|
||||
reservoir.keys[max] = k
|
||||
}
|
||||
}
|
||||
@ -85,7 +85,7 @@ func (reservoir *Reservoir) Merge(operand *Reservoir) error {
|
||||
return errs.New("cannot merge: mismatched size: expected %d but got %d", reservoir.size, operand.size)
|
||||
}
|
||||
for i := int8(0); i < operand.index; i++ {
|
||||
reservoir.sample(operand.keys[i], &operand.segments[i])
|
||||
reservoir.sample(operand.keys[i], operand.segments[i])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ func TestReservoir(t *testing.T) {
|
||||
// If we sample N segments, less than the max, we should record all N
|
||||
r := NewReservoir(size)
|
||||
for _, sample := range samples {
|
||||
r.Sample(rng, &sample)
|
||||
r.Sample(rng, sample)
|
||||
}
|
||||
require.Equal(t, samples, r.Segments())
|
||||
require.Len(t, r.Keys(), len(samples))
|
||||
@ -50,14 +50,14 @@ func TestReservoirMerge(t *testing.T) {
|
||||
}
|
||||
rng := rand.New(rand.NewSource(999))
|
||||
r1 := NewReservoir(3)
|
||||
r1.Sample(rng, &segments[0])
|
||||
r1.Sample(rng, &segments[1])
|
||||
r1.Sample(rng, &segments[2])
|
||||
r1.Sample(rng, segments[0])
|
||||
r1.Sample(rng, segments[1])
|
||||
r1.Sample(rng, segments[2])
|
||||
|
||||
r2 := NewReservoir(3)
|
||||
r2.Sample(rng, &segments[3])
|
||||
r2.Sample(rng, &segments[4])
|
||||
r2.Sample(rng, &segments[5])
|
||||
r2.Sample(rng, segments[3])
|
||||
r2.Sample(rng, segments[4])
|
||||
r2.Sample(rng, segments[5])
|
||||
|
||||
err := r1.Merge(r2)
|
||||
require.NoError(t, err)
|
||||
@ -93,7 +93,7 @@ func TestReservoirWeights(t *testing.T) {
|
||||
weight1StreamID: 0,
|
||||
}
|
||||
|
||||
segments := []*segmentloop.Segment{
|
||||
segments := []segmentloop.Segment{
|
||||
{
|
||||
StreamID: weight10StreamID,
|
||||
Position: metabase.SegmentPosition{},
|
||||
@ -167,7 +167,7 @@ func TestReservoirBias(t *testing.T) {
|
||||
EncryptedSize: weight,
|
||||
}
|
||||
binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits))
|
||||
res.Sample(rng, &seg)
|
||||
res.Sample(rng, seg)
|
||||
}
|
||||
for i, seg := range res.Segments() {
|
||||
num := binary.BigEndian.Uint64(seg.StreamID[0:8]) >> (64 - useBits)
|
||||
|
@ -32,7 +32,7 @@ func TestReverifyPiece(t *testing.T) {
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
segment := uploadSomeData(t, ctx, planet)
|
||||
|
||||
@ -62,7 +62,7 @@ func TestReverifyPieceSucceeds(t *testing.T) {
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
segment := uploadSomeData(t, ctx, planet)
|
||||
|
||||
@ -92,7 +92,7 @@ func TestReverifyPieceWithNodeOffline(t *testing.T) {
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
segment := uploadSomeData(t, ctx, planet)
|
||||
|
||||
@ -125,7 +125,7 @@ func TestReverifyPieceWithPieceMissing(t *testing.T) {
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
segment := uploadSomeData(t, ctx, planet)
|
||||
|
||||
@ -161,7 +161,7 @@ func testReverifyRewrittenPiece(t *testing.T, mutator func(content []byte, heade
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
segment := uploadSomeData(t, ctx, planet)
|
||||
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
func TestReverifySuccess(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
// This is a bulky test but all it's doing is:
|
||||
@ -90,7 +90,7 @@ func TestReverifySuccess(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReverifyFailMissingShare(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
// - uploads random data
|
||||
@ -160,7 +160,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReverifyOffline(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
// - uploads random data
|
||||
@ -226,7 +226,7 @@ func TestReverifyOffline(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReverifyOfflineDialTimeout(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
// - uploads random data
|
||||
@ -320,7 +320,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReverifyDeletedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
||||
@ -401,7 +401,7 @@ func cloneAndDropPiece(ctx context.Context, metabaseDB *metabase.DB, segment *me
|
||||
}
|
||||
|
||||
func TestReverifyModifiedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
||||
@ -479,7 +479,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReverifyReplacedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
||||
@ -548,7 +548,7 @@ func TestReverifyReplacedSegment(t *testing.T) {
|
||||
|
||||
// TestReverifyExpired tests the case where the segment passed into Reverify is expired.
|
||||
func TestReverifyExpired(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -609,7 +609,7 @@ func TestReverifyExpired(t *testing.T) {
|
||||
// audit service gets put into containment mode.
|
||||
func TestReverifySlowDownload(t *testing.T) {
|
||||
const auditTimeout = time.Second
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -684,7 +684,7 @@ func TestReverifySlowDownload(t *testing.T) {
|
||||
|
||||
// TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained.
|
||||
func TestReverifyUnknownError(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -750,7 +750,7 @@ func TestReverifyUnknownError(t *testing.T) {
|
||||
|
||||
func TestMaxReverifyCount(t *testing.T) {
|
||||
const auditTimeout = time.Second
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -851,7 +851,7 @@ func TestTimeDelayBeforeReverifies(t *testing.T) {
|
||||
auditTimeout = time.Second
|
||||
reverifyInterval = time.Second / 4
|
||||
)
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
|
@ -39,7 +39,7 @@ import (
|
||||
// returned by the DownloadShares method contain no error if all shares were
|
||||
// downloaded successfully.
|
||||
func TestDownloadSharesHappyPath(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -94,7 +94,7 @@ func TestDownloadSharesHappyPath(t *testing.T) {
|
||||
// If this test fails, this most probably means we made a backward-incompatible
|
||||
// change that affects the audit service.
|
||||
func TestDownloadSharesOfflineNode(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -157,7 +157,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
|
||||
// If this test fails, this most probably means we made a backward-incompatible
|
||||
// change that affects the audit service.
|
||||
func TestDownloadSharesMissingPiece(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -216,7 +216,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
|
||||
// If this test fails, this most probably means we made a backward-incompatible
|
||||
// change that affects the audit service.
|
||||
func TestDownloadSharesDialTimeout(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -297,7 +297,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
|
||||
// If this test fails, this most probably means we made a backward-incompatible
|
||||
// change that affects the audit service.
|
||||
func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -370,7 +370,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierHappyPath(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -409,7 +409,7 @@ func TestVerifierHappyPath(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierExpired(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -448,7 +448,7 @@ func TestVerifierExpired(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierOfflineNode(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
|
||||
@ -493,7 +493,7 @@ func TestVerifierOfflineNode(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierMissingPiece(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -540,7 +540,7 @@ func TestVerifierMissingPiece(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierNotEnoughPieces(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -609,7 +609,7 @@ func TestVerifierNotEnoughPieces(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierDialTimeout(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -674,7 +674,7 @@ func TestVerifierDialTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierDeletedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -712,7 +712,7 @@ func TestVerifierDeletedSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierModifiedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -765,7 +765,7 @@ func TestVerifierModifiedSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierReplacedSegment(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -805,7 +805,7 @@ func TestVerifierReplacedSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -855,7 +855,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
|
||||
// TestVerifierSlowDownload checks that a node that times out while sending data to the
|
||||
// audit service gets put into containment mode.
|
||||
func TestVerifierSlowDownload(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -916,7 +916,7 @@ func TestVerifierSlowDownload(t *testing.T) {
|
||||
// TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request
|
||||
// does not get marked as successful, failed, or contained.
|
||||
func TestVerifierUnknownError(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
@ -1144,7 +1144,7 @@ func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Plan
|
||||
}
|
||||
|
||||
func TestIdentifyContainedNodes(t *testing.T) {
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -1198,7 +1198,7 @@ func TestConcurrentAuditsSuccess(t *testing.T) {
|
||||
minPieces = 5
|
||||
)
|
||||
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
// every segment gets a piece on every node, so that every segment audit
|
||||
@ -1273,7 +1273,7 @@ func TestConcurrentAuditsUnknownError(t *testing.T) {
|
||||
badNodes = minPieces / 2
|
||||
)
|
||||
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
// every segment gets a piece on every node, so that every segment audit
|
||||
@ -1356,7 +1356,7 @@ func TestConcurrentAuditsFailure(t *testing.T) {
|
||||
badNodes = minPieces / 2
|
||||
)
|
||||
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
// every segment gets a piece on every node, so that every segment audit
|
||||
@ -1443,7 +1443,7 @@ func TestConcurrentAuditsTimeout(t *testing.T) {
|
||||
retryInterval = 5 * time.Minute
|
||||
)
|
||||
|
||||
testWithChoreAndObserver(t, testplanet.Config{
|
||||
testWithRangedLoop(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
// every segment should get a piece on every node, so that every segment audit
|
||||
|
@ -30,7 +30,7 @@ type Config struct {
|
||||
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
|
||||
VerificationPushBatchSize int `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"`
|
||||
WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"`
|
||||
UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"`
|
||||
UseRangedLoop bool `help:"whether use Audit observer with ranged loop." default:"true"`
|
||||
|
||||
ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"`
|
||||
ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"`
|
||||
|
@ -110,7 +110,6 @@ type Core struct {
|
||||
Audit struct {
|
||||
VerifyQueue audit.VerifyQueue
|
||||
ReverifyQueue audit.ReverifyQueue
|
||||
Chore *audit.Chore
|
||||
ContainmentSyncChore *audit.ContainmentSyncChore
|
||||
}
|
||||
|
||||
@ -368,34 +367,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.Audit.VerifyQueue = db.VerifyQueue()
|
||||
peer.Audit.ReverifyQueue = db.ReverifyQueue()
|
||||
|
||||
if config.UseRangedLoop {
|
||||
peer.Log.Named("audit:chore").Info("using ranged loop")
|
||||
} else {
|
||||
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
|
||||
peer.Audit.VerifyQueue,
|
||||
peer.Metainfo.SegmentLoop,
|
||||
config,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "audit:chore",
|
||||
Run: peer.Audit.Chore.Run,
|
||||
Close: peer.Audit.Chore.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
|
||||
|
||||
peer.Audit.ContainmentSyncChore = audit.NewContainmentSyncChore(peer.Log.Named("audit:containment-sync-chore"),
|
||||
peer.Audit.ReverifyQueue,
|
||||
peer.Overlay.DB,
|
||||
config.ContainmentSyncChoreInterval,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "audit:containment-sync-chore",
|
||||
Run: peer.Audit.ContainmentSyncChore.Run,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Audit Containment Sync Chore", peer.Audit.ContainmentSyncChore.Loop))
|
||||
}
|
||||
peer.Audit.ContainmentSyncChore = audit.NewContainmentSyncChore(peer.Log.Named("audit:containment-sync-chore"),
|
||||
peer.Audit.ReverifyQueue,
|
||||
peer.Overlay.DB,
|
||||
config.ContainmentSyncChoreInterval,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "audit:containment-sync-chore",
|
||||
Run: peer.Audit.ContainmentSyncChore.Run,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Audit Containment Sync Chore", peer.Audit.ContainmentSyncChore.Loop))
|
||||
}
|
||||
|
||||
{ // setup expired segment cleanup
|
||||
|
@ -171,7 +171,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
// pause chores that might update node data
|
||||
satellite.Audit.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
for _, node := range planet.StorageNodes {
|
||||
|
@ -392,7 +392,7 @@ func TestGetOnlineNodesForGetDelete(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// pause chores that might update node data
|
||||
planet.Satellites[0].Audit.Chore.Loop.Pause()
|
||||
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
planet.Satellites[0].Repair.Checker.Loop.Pause()
|
||||
planet.Satellites[0].Repair.Repairer.Loop.Pause()
|
||||
for _, node := range planet.StorageNodes {
|
||||
|
@ -1323,7 +1323,7 @@ func TestRepairExpiredSegment(t *testing.T) {
|
||||
satellite := planet.Satellites[0]
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Stop()
|
||||
satellite.Audit.Chore.Loop.Pause()
|
||||
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
@ -1361,9 +1361,6 @@ func TestRepairExpiredSegment(t *testing.T) {
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
// get encrypted path of segment with audit service
|
||||
satellite.Audit.Chore.Loop.TriggerWait()
|
||||
|
||||
// Verify that the segment is on the repair queue
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -2873,7 +2870,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
|
||||
audits := testSatellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
testSatellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
@ -2881,7 +2878,10 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
|
||||
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
// trigger audit
|
||||
_, err = testSatellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
queue := audits.VerifyQueue
|
||||
queueSegment, err := queue.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -2945,7 +2945,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
|
||||
audits := testSatellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
testSatellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
@ -2953,7 +2953,10 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
|
||||
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
// trigger audit
|
||||
_, err = testSatellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
queue := audits.VerifyQueue
|
||||
queueSegment, err := queue.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -25,7 +25,8 @@ func TestConcurrentAudit(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
planet.Satellites[0].Audit.Chore.Loop.Stop()
|
||||
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Stop()
|
||||
|
||||
data := testrand.Bytes(10 * memory.MB)
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "testpath", data)
|
||||
require.NoError(t, err)
|
||||
|
4
scripts/testdata/satellite-config.yaml.lock
vendored
4
scripts/testdata/satellite-config.yaml.lock
vendored
@ -79,8 +79,8 @@
|
||||
# number of reservoir slots allotted for nodes, currently capped at 3
|
||||
# audit.slots: 3
|
||||
|
||||
# whether or not to use the ranged loop observer instead of the chore.
|
||||
# audit.use-ranged-loop: false
|
||||
# whether use Audit observer with ranged loop.
|
||||
# audit.use-ranged-loop: true
|
||||
|
||||
# number of audit jobs to push at once to the verification queue
|
||||
# audit.verification-push-batch-size: 4096
|
||||
|
Loading…
Reference in New Issue
Block a user