satellite/audit: implement rangedloop observer

This change implements the ranged loop observer to replace the audit
chore that builds the audit queue.

The strategy employed by this change is to use a collector for each
segment range to  build separate per-node segment reservoirs that are
then merge them during the join step.

In previous observer migrations, there were only a handful of tests so
the strategy was to duplicate them. In this package, there are dozens
of tests that utilize the chore. To reduce code churn and maintenance
burden until the chore is removed, this change introduces a helper that
runs tests under both the chore and observer, providing a pair of
functions that can be used to pause or run the queueing function.

https://github.com/storj/storj/issues/5232

Change-Id: I8bb4b4e55cf98b1aac9f26307e3a9a355cb3f506
This commit is contained in:
Andrew Harding 2022-12-14 17:37:37 -07:00
parent 9544a670d7
commit 590d44301c
12 changed files with 441 additions and 156 deletions

View File

@ -25,14 +25,14 @@ import (
// specified bucket are counted correctly for storage node audit bandwidth
// usage and the storage nodes will be paid for that.
func TestAuditOrderLimit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
now := time.Now()
@ -46,7 +46,9 @@ func TestAuditOrderLimit(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queueSegment, err := audits.VerifyQueue.Next(ctx)
require.NoError(t, err)
require.False(t, queueSegment.StreamID.IsZero())
@ -77,14 +79,14 @@ func TestAuditOrderLimit(t *testing.T) {
// Minimal test to verify that copies aren't audited.
func TestAuditSkipsRemoteCopies(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -105,7 +107,9 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
_, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
auditSegments := make([]audit.Segment, 0, 2)
@ -131,7 +135,9 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
err = uplink.DeleteObject(ctx, satellite, "testbucket", "testobj2")
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue = audits.VerifyQueue
// verify that the copy is being audited
@ -146,14 +152,14 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
// Minimal test to verify that inline objects are not audited even if they are copies.
func TestAuditSkipsInlineCopies(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
uplink := planet.Uplinks[0]
testData := testrand.Bytes(1 * memory.KiB)
@ -170,7 +176,9 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
_, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
@ -181,7 +189,9 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
err = uplink.DeleteObject(ctx, satellite, "testbucket", "testobj2")
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue = audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)

View File

@ -53,3 +53,19 @@ func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentl
func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}
// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
func (collector *Collector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
for _, segment := range segments {
// The reservoir ends up deferencing and copying the segment internally
// but that's not obvious, so alias the loop variable.
segment := segment
if segment.Inline() {
continue
}
if err := collector.RemoteSegment(ctx, &segment); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,60 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
import (
"context"
"testing"
"go.uber.org/zap"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)
type pauseQueueingFunc = func(satellite *testplanet.Satellite)
type runQueueingOnceFunc = func(ctx context.Context, satellite *testplanet.Satellite) error
// testWithChoreAndObserver runs an audit test for both the chore and observer.
// It provides functions that the test can use to pause and run the queueing
// done by the chore or observer.
func testWithChoreAndObserver(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) {
t.Run("Chore", func(t *testing.T) {
planetConfig := planetConfig
testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
t.Helper()
run(t, ctx, planet,
func(satellite *testplanet.Satellite) {
satellite.Audit.Chore.Loop.Pause()
},
func(ctx context.Context, satellite *testplanet.Satellite) error {
satellite.Audit.Chore.Loop.TriggerWait()
return nil
},
)
})
})
t.Run("Observer", func(t *testing.T) {
planetConfig := planetConfig
reconfigureSatellite := planetConfig.Reconfigure.Satellite
planetConfig.Reconfigure.Satellite = func(log *zap.Logger, index int, config *satellite.Config) {
if reconfigureSatellite != nil {
reconfigureSatellite(log, index, config)
}
config.Audit.UseRangedLoop = true
}
testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
t.Helper()
run(t, ctx, planet,
func(satellite *testplanet.Satellite) {},
func(ctx context.Context, satellite *testplanet.Satellite) error {
_, err := satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
return err
},
)
})
})
}

View File

@ -72,14 +72,14 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo
}
func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(testSatellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -87,7 +87,9 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, testSatellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -126,14 +128,14 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
}
func TestGetSharePrefers(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(testSatellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -141,7 +143,9 @@ func TestGetSharePrefers(t *testing.T) {
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, testSatellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)

View File

@ -20,7 +20,7 @@ import (
)
func TestChoreAndWorkerIntegration(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
@ -28,11 +28,11 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
@ -44,11 +44,12 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
require.NoError(t, err)
}
audits.Chore.Loop.TriggerWait()
err := runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
uniqueSegments := make(map[audit.Segment]struct{})
var err error
var segment audit.Segment
var segmentCount int
for {
@ -67,7 +68,8 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)
// Repopulate the queue for the worker.
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
// Make sure the worker processes the audit queue.
audits.Worker.Loop.TriggerWait()

114
satellite/audit/observer.go Normal file
View File

@ -0,0 +1,114 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"math/rand"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/rangedloop"
)
// Observer populates reservoirs and the audit queue.
//
// architecture: Observer
type Observer struct {
log *zap.Logger
queue VerifyQueue
config Config
seedRand *rand.Rand
// The follow fields are reset on each segment loop cycle.
reservoirs map[storj.NodeID]*Reservoir
}
// NewObserver instantiates Observer.
func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer {
if config.VerificationPushBatchSize < 1 {
config.VerificationPushBatchSize = 1
}
return &Observer{
log: log,
queue: queue,
config: config,
seedRand: rand.New(rand.NewSource(time.Now().Unix())),
}
}
// Start prepares the observer for audit segment collection.
func (obs *Observer) Start(ctx context.Context, startTime time.Time) error {
obs.reservoirs = make(map[storj.NodeID]*Reservoir)
return nil
}
// Fork returns a new audit reservoir collector for the range.
func (obs *Observer) Fork(ctx context.Context) (rangedloop.Partial, error) {
// Each collector needs an RNG for sampling. On systems where time
// resolution is low (e.g. windows is 15ms), seeding an RNG using the
// current time (even with nanosecond precision) may end up reusing a seed
// for two or more RNGs. To prevent that, the observer itself uses an RNG
// to seed the per-collector RNGs.
rnd := rand.New(rand.NewSource(obs.seedRand.Int63()))
return NewCollector(obs.config.Slots, rnd), nil
}
// Join merges the audit reservoir collector into the per-node reservoirs.
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) error {
collector, ok := partial.(*Collector)
if !ok {
return errs.New("expected partial type %T but got %T", collector, partial)
}
for nodeID, reservoir := range collector.Reservoirs {
existing, ok := obs.reservoirs[nodeID]
if !ok {
obs.reservoirs[nodeID] = reservoir
continue
}
if err := existing.Merge(reservoir); err != nil {
return err
}
}
return nil
}
// Finish builds and dedups an audit queue from the merged per-node reservoirs.
func (obs *Observer) Finish(ctx context.Context) error {
type SegmentKey struct {
StreamID uuid.UUID
Position uint64
}
var newQueue []Segment
queueSegments := make(map[SegmentKey]struct{})
// Add reservoir segments to queue in pseudorandom order.
for i := 0; i < obs.config.Slots; i++ {
for _, res := range obs.reservoirs {
segments := res.Segments()
// Skip reservoir if no segment at this index.
if len(segments) <= i {
continue
}
segment := segments[i]
segmentKey := SegmentKey{
StreamID: segment.StreamID,
Position: segment.Position.Encode(),
}
if _, ok := queueSegments[segmentKey]; !ok {
newQueue = append(newQueue, NewSegment(segment))
queueSegments[segmentKey] = struct{}{}
}
}
}
// Push new queue to queues struct so it can be fetched by worker.
return obs.queue.Push(ctx, newQueue, obs.config.VerificationPushBatchSize)
}

View File

@ -27,9 +27,9 @@ import (
)
func TestReverifySuccess(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// This is a bulky test but all it's doing is:
// - uploads random data
// - uses the cursor to get a stripe
@ -40,7 +40,7 @@ func TestReverifySuccess(t *testing.T) {
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -48,7 +48,9 @@ func TestReverifySuccess(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -87,9 +89,9 @@ func TestReverifySuccess(t *testing.T) {
}
func TestReverifyFailMissingShare(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data
// - uses the cursor to get a stripe
// - creates one pending audit for a node holding a piece for that stripe
@ -101,7 +103,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
audits := satellite.Audit
reporter := audits.Reporter
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -109,7 +111,9 @@ func TestReverifyFailMissingShare(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -155,9 +159,9 @@ func TestReverifyFailMissingShare(t *testing.T) {
}
func TestReverifyOffline(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data
// - uses the cursor to get a stripe
// - creates pending audits for one node holding a piece for that stripe
@ -170,7 +174,7 @@ func TestReverifyOffline(t *testing.T) {
reporter := audits.Reporter
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -178,7 +182,9 @@ func TestReverifyOffline(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -219,9 +225,9 @@ func TestReverifyOffline(t *testing.T) {
}
func TestReverifyOfflineDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data
// - uses the cursor to get a stripe
// - creates pending audit for one node holding a piece for that stripe
@ -233,7 +239,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -241,7 +247,9 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -311,12 +319,12 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
}
func TestReverifyDeletedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data to all nodes
// - gets a segment from the audit queue
// - creates one pending audit for a node holding a piece for that segment
@ -328,14 +336,16 @@ func TestReverifyDeletedSegment(t *testing.T) {
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -390,12 +400,12 @@ func cloneAndDropPiece(ctx context.Context, metabaseDB *metabase.DB, segment *me
}
func TestReverifyModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data to an object on all nodes
// - creates a pending audit for a particular piece of that object
// - removes a (different) piece from the object so that the segment is modified
@ -405,14 +415,16 @@ func TestReverifyModifiedSegment(t *testing.T) {
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -466,12 +478,12 @@ func TestReverifyModifiedSegment(t *testing.T) {
}
func TestReverifyReplacedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
// - uploads random data to an object on all nodes
// - creates a pending audit for a particular piece of that object
// - re-uploads the object (with the same contents) so that the segment is modified
@ -481,14 +493,16 @@ func TestReverifyReplacedSegment(t *testing.T) {
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -533,14 +547,14 @@ func TestReverifyReplacedSegment(t *testing.T) {
// TestReverifyExpired tests the case where the segment passed into Reverify is expired.
func TestReverifyExpired(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -548,7 +562,9 @@ func TestReverifyExpired(t *testing.T) {
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -592,7 +608,7 @@ func TestReverifyExpired(t *testing.T) {
// audit service gets put into containment mode.
func TestReverifySlowDownload(t *testing.T) {
const auditTimeout = time.Second
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -607,19 +623,21 @@ func TestReverifySlowDownload(t *testing.T) {
testplanet.ReconfigureRS(2, 2, 4, 4),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -665,7 +683,7 @@ func TestReverifySlowDownload(t *testing.T) {
// TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained.
func TestReverifyUnknownError(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -673,19 +691,21 @@ func TestReverifyUnknownError(t *testing.T) {
},
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -729,7 +749,7 @@ func TestReverifyUnknownError(t *testing.T) {
func TestMaxReverifyCount(t *testing.T) {
const auditTimeout = time.Second
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -746,20 +766,22 @@ func TestMaxReverifyCount(t *testing.T) {
testplanet.ReconfigureRS(2, 2, 4, 4),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.ReverifyWorker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)

View File

@ -37,14 +37,14 @@ import (
// returned by the DownloadShares method contain no error if all shares were
// downloaded successfully.
func TestDownloadSharesHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -52,7 +52,9 @@ func TestDownloadSharesHappyPath(t *testing.T) {
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -90,14 +92,14 @@ func TestDownloadSharesHappyPath(t *testing.T) {
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -105,7 +107,9 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -151,14 +155,14 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -166,7 +170,9 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -208,14 +214,14 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -223,7 +229,9 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -287,21 +295,21 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDownloadTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
storageNodeDB := planet.StorageNodes[0].DB.(*testblobs.SlowDB)
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -309,7 +317,9 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -358,14 +368,14 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
}
func TestVerifierHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -373,7 +383,9 @@ func TestVerifierHappyPath(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -395,14 +407,14 @@ func TestVerifierHappyPath(t *testing.T) {
}
func TestVerifierExpired(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -410,7 +422,9 @@ func TestVerifierExpired(t *testing.T) {
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -432,15 +446,15 @@ func TestVerifierExpired(t *testing.T) {
}
func TestVerifierOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -448,7 +462,9 @@ func TestVerifierOfflineNode(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -475,14 +491,14 @@ func TestVerifierOfflineNode(t *testing.T) {
}
func TestVerifierMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -490,7 +506,9 @@ func TestVerifierMissingPiece(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -520,7 +538,7 @@ func TestVerifierMissingPiece(t *testing.T) {
}
func TestVerifierNotEnoughPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -528,12 +546,12 @@ func TestVerifierNotEnoughPieces(t *testing.T) {
},
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -541,7 +559,9 @@ func TestVerifierNotEnoughPieces(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -587,14 +607,14 @@ func TestVerifierNotEnoughPieces(t *testing.T) {
}
func TestVerifierDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -602,7 +622,9 @@ func TestVerifierDialTimeout(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -650,14 +672,14 @@ func TestVerifierDialTimeout(t *testing.T) {
}
func TestVerifierDeletedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -665,7 +687,9 @@ func TestVerifierDeletedSegment(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
segment, err := queue.Next(ctx)
require.NoError(t, err)
@ -686,14 +710,14 @@ func TestVerifierDeletedSegment(t *testing.T) {
}
func TestVerifierModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -701,7 +725,9 @@ func TestVerifierModifiedSegment(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -737,14 +763,14 @@ func TestVerifierModifiedSegment(t *testing.T) {
}
func TestVerifierReplacedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -752,7 +778,9 @@ func TestVerifierReplacedSegment(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
segment, err := queue.Next(ctx)
require.NoError(t, err)
@ -775,14 +803,14 @@ func TestVerifierReplacedSegment(t *testing.T) {
}
func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -790,7 +818,9 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -823,7 +853,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
// TestVerifierSlowDownload checks that a node that times out while sending data to the
// audit service gets put into containment mode.
func TestVerifierSlowDownload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -838,12 +868,12 @@ func TestVerifierSlowDownload(t *testing.T) {
testplanet.ReconfigureRS(2, 2, 4, 4),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -851,7 +881,9 @@ func TestVerifierSlowDownload(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -882,7 +914,7 @@ func TestVerifierSlowDownload(t *testing.T) {
// TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request
// does not get marked as successful, failed, or contained.
func TestVerifierUnknownError(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
@ -890,12 +922,12 @@ func TestVerifierUnknownError(t *testing.T) {
},
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -903,7 +935,9 @@ func TestVerifierUnknownError(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
@ -1108,14 +1142,14 @@ func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Plan
}
func TestIdentifyContainedNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -1123,7 +1157,9 @@ func TestIdentifyContainedNodes(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)

View File

@ -30,6 +30,7 @@ type Config struct {
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
VerificationPushBatchSize int `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"`
WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"`
UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"`
ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"`
ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"`

View File

@ -397,18 +397,22 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Audit.VerifyQueue = db.VerifyQueue()
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.VerifyQueue,
peer.Metainfo.SegmentLoop,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:chore",
Run: peer.Audit.Chore.Run,
Close: peer.Audit.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
if config.UseRangedLoop {
peer.Log.Named("audit:chore").Info("using ranged loop")
} else {
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.VerifyQueue,
peer.Metainfo.SegmentLoop,
config,
)
peer.Services.Add(lifecycle.Item{
Name: "audit:chore",
Run: peer.Audit.Chore.Run,
Close: peer.Audit.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
}
}
{ // setup expired segment cleanup

View File

@ -18,6 +18,7 @@ import (
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/storj/private/lifecycle"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
@ -35,6 +36,10 @@ type RangedLoop struct {
Servers *lifecycle.Group
Services *lifecycle.Group
Audit struct {
Observer rangedloop.Observer
}
Debug struct {
Listener net.Listener
Server *debug.Server
@ -83,6 +88,10 @@ func NewRangedLoop(log *zap.Logger, full *identity.FullIdentity, db DB, metabase
})
}
{ // setup audit observer
peer.Audit.Observer = audit.NewObserver(log.Named("audit"), db.VerifyQueue(), config.Audit)
}
{ // setup metrics observer
peer.Metrics.Observer = metrics.NewObserver()
}
@ -99,6 +108,10 @@ func NewRangedLoop(log *zap.Logger, full *identity.FullIdentity, db DB, metabase
{ // setup ranged loop
var observers []rangedloop.Observer
if config.Audit.UseRangedLoop {
observers = append(observers, peer.Audit.Observer)
}
if config.Metrics.UseRangedLoop {
observers = append(observers, peer.Metrics.Observer)
}

View File

@ -61,6 +61,9 @@
# number of reservoir slots allotted for nodes, currently capped at 3
# audit.slots: 3
# whether or not to use the ranged loop observer instead of the chore.
# audit.use-ranged-loop: false
# number of audit jobs to push at once to the verification queue
# audit.verification-push-batch-size: 4096