satellite/audit: use db for auditor queue

As part of the effort of splitting out the auditor workers to their own
process, we are transitioning the communication between the auditor
chore and the verification workers to a queue implemented in the
database, rather than the sequence of in-memory queues we used to use.

This logical database is safely partitionable from the rest of
satelliteDB.

Refs: https://github.com/storj/storj/issues/5251

Change-Id: I6cd31ac5265423271fbafe6127a86172c5cb53dc
This commit is contained in:
paul cannon 2022-11-11 17:11:40 -06:00 committed by Storj Robot
parent b574ee5e6d
commit 8b494f3740
12 changed files with 158 additions and 149 deletions

View File

@ -133,11 +133,11 @@ type Satellite struct {
}
Audit struct {
Queues *audit.Queues
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter audit.Reporter
}
Reputation struct {
@ -608,7 +608,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = repairerPeer.Repairer
system.Audit.Queues = peer.Audit.Queues
system.Audit.VerifyQueue = peer.Audit.VerifyQueue
system.Audit.Worker = peer.Audit.Worker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier

View File

@ -47,8 +47,7 @@ func TestAuditOrderLimit(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queueSegment, err := audits.VerifyQueue.Next(ctx)
require.NoError(t, err)
require.False(t, queueSegment.StreamID.IsZero())
@ -110,12 +109,11 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
require.Equal(t, 2, queue.Size())
queue := audits.VerifyQueue
auditSegments := make([]audit.Segment, 0, 2)
for range originalSegments {
auditSegment, err := queue.Next()
auditSegment, err := queue.Next(ctx)
require.NoError(t, err)
auditSegments = append(auditSegments, auditSegment)
}
@ -137,11 +135,10 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
queue = audits.VerifyQueue
// verify that the copy is being audited
require.Equal(t, 1, queue.Size())
remainingSegment, err := queue.Next()
remainingSegment, err := queue.Next(ctx)
require.NoError(t, err)
for _, originalSegment := range originalSegments {
@ -177,8 +174,9 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
require.Zero(t, queue.Size())
queue := audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
// delete originals, keep 1 copy
err = uplink.DeleteObject(ctx, satellite, "testbucket", "testobj1")
@ -187,8 +185,8 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
require.Zero(t, queue.Size())
queue = audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
})
}

View File

@ -19,22 +19,22 @@ import (
//
// architecture: Chore
type Chore struct {
log *zap.Logger
rand *rand.Rand
queues *Queues
Loop *sync2.Cycle
log *zap.Logger
rand *rand.Rand
queue VerifyQueue
Loop *sync2.Cycle
segmentLoop *segmentloop.Service
config Config
}
// NewChore instantiates Chore.
func NewChore(log *zap.Logger, queues *Queues, loop *segmentloop.Service, config Config) *Chore {
func NewChore(log *zap.Logger, queue VerifyQueue, loop *segmentloop.Service, config Config) *Chore {
return &Chore{
log: log,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queues: queues,
Loop: sync2.NewCycle(config.ChoreInterval),
log: log,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queue: queue,
Loop: sync2.NewCycle(config.ChoreInterval),
segmentLoop: loop,
config: config,
@ -47,12 +47,6 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// If the previously pushed queue is still waiting to be swapped in, wait.
err = chore.queues.WaitForSwap(ctx)
if err != nil {
return err
}
collector := NewCollector(chore.config.Slots, chore.rand)
err = chore.segmentLoop.Join(ctx, collector)
if err != nil {
@ -92,7 +86,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
// Push new queue to queues struct so it can be fetched by worker.
return chore.queues.Push(newQueue)
return chore.queue.Push(ctx, newQueue)
})
}

View File

@ -88,8 +88,8 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := testSatellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -142,8 +142,8 @@ func TestGetSharePrefers(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := testSatellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{

View File

@ -4,6 +4,7 @@
package audit_test
import (
"context"
"strconv"
"testing"
@ -36,15 +37,14 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
require.EqualValues(t, 2, queue.Size(), "audit queue")
queue := audits.VerifyQueue
uniqueSegments := make(map[audit.Segment]struct{})
var err error
var segment audit.Segment
var segmentCount int
for {
segment, err = queue.Next()
segment, err = queue.Next(ctx)
if err != nil {
break
}
@ -54,16 +54,22 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
uniqueSegments[segment] = struct{}{}
}
require.True(t, audit.ErrEmptyQueue.Has(err))
require.True(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
require.Equal(t, 2, segmentCount)
require.Equal(t, 0, queue.Size())
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)
// Repopulate the queue for the worker.
audits.Chore.Loop.TriggerWait()
// Make sure the worker processes the audit queue.
audits.Worker.Loop.TriggerWait()
queue = audits.Queues.Fetch()
require.EqualValues(t, 0, queue.Size(), "audit queue")
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)
})
}
func requireAuditQueueEmpty(ctx context.Context, t *testing.T, verifyQueue audit.VerifyQueue) {
entry, err := verifyQueue.Next(ctx)
require.NotNilf(t, err, "expected empty audit queue, but got entry %+v", entry)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty audit queue error, but unexpectedly got error %v", err)
require.Empty(t, entry)
}

View File

@ -52,8 +52,8 @@ func TestReverifySuccess(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -131,8 +131,8 @@ func TestReverifyFailMissingShare(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -217,8 +217,8 @@ func TestReverifyFailBadData(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -289,8 +289,8 @@ func TestReverifyOffline(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -363,8 +363,8 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -465,8 +465,8 @@ func TestReverifyDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -513,8 +513,8 @@ func TestReverifyDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
queueSegment, err = queue.Next()
queue = audits.VerifyQueue
queueSegment, err = queue.Next(ctx)
require.NoError(t, err)
// reverify the new segment
@ -555,8 +555,8 @@ func TestReverifyModifiedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -604,10 +604,10 @@ func TestReverifyModifiedSegment(t *testing.T) {
// select the segment that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
queueSegment1, err := queue.Next()
queue = audits.VerifyQueue
queueSegment1, err := queue.Next(ctx)
require.NoError(t, err)
queueSegment2, err := queue.Next()
queueSegment2, err := queue.Next(ctx)
require.NoError(t, err)
reverifySegment := queueSegment1
if queueSegment1 == queueSegment {
@ -652,8 +652,8 @@ func TestReverifyReplacedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -693,10 +693,10 @@ func TestReverifyReplacedSegment(t *testing.T) {
// select the segment that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
queueSegment1, err := queue.Next()
queue = audits.VerifyQueue
queueSegment1, err := queue.Next(ctx)
require.NoError(t, err)
queueSegment2, err := queue.Next()
queueSegment2, err := queue.Next(ctx)
require.NoError(t, err)
reverifySegment := queueSegment1
if queueSegment1 == queueSegment {
@ -750,10 +750,10 @@ func TestReverifyDifferentShare(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment1, err := queue.Next()
queue := audits.VerifyQueue
queueSegment1, err := queue.Next(ctx)
require.NoError(t, err)
queueSegment2, err := queue.Next()
queueSegment2, err := queue.Next(ctx)
require.NoError(t, err)
require.NotEqual(t, queueSegment1, queueSegment2)
@ -857,8 +857,8 @@ func TestReverifyExpired1(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
// move time into the future so the segment is expired
@ -904,10 +904,10 @@ func TestReverifyExpired2(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment1, err := queue.Next()
queue := audits.VerifyQueue
queueSegment1, err := queue.Next(ctx)
require.NoError(t, err)
queueSegment2, err := queue.Next()
queueSegment2, err := queue.Next(ctx)
require.NoError(t, err)
require.NotEqual(t, queueSegment1, queueSegment2)
@ -1030,8 +1030,8 @@ func TestReverifySlowDownload(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -1119,8 +1119,8 @@ func TestReverifyUnknownError(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -1212,8 +1212,8 @@ func TestMaxReverifyCount(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{

View File

@ -53,8 +53,8 @@ func TestDownloadSharesHappyPath(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -106,8 +106,8 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -167,8 +167,8 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -224,8 +224,8 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -310,8 +310,8 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -374,8 +374,8 @@ func TestVerifierHappyPath(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -411,8 +411,8 @@ func TestVerifierExpired(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
// move time into the future so the segment is expired
@ -449,8 +449,8 @@ func TestVerifierOfflineNode(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -491,8 +491,8 @@ func TestVerifierMissingPiece(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -542,8 +542,8 @@ func TestVerifierNotEnoughPieces(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -603,8 +603,8 @@ func TestVerifierDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -666,8 +666,8 @@ func TestVerifierDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
segment, err := queue.Next()
queue := audits.VerifyQueue
segment, err := queue.Next(ctx)
require.NoError(t, err)
// delete the file
@ -702,8 +702,8 @@ func TestVerifierModifiedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
var segment metabase.Segment
@ -753,8 +753,8 @@ func TestVerifierReplacedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
segment, err := queue.Next()
queue := audits.VerifyQueue
segment, err := queue.Next(ctx)
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
@ -791,8 +791,8 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -852,8 +852,8 @@ func TestVerifierSlowDownload(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -904,8 +904,8 @@ func TestVerifierUnknownError(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{

View File

@ -13,6 +13,7 @@ import (
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase"
)
// Error is the default audit errs class.
@ -29,12 +30,15 @@ type Config struct {
QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"`
ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"`
ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"`
}
// Worker contains information for populating audit queue and processing audits.
type Worker struct {
log *zap.Logger
queues *Queues
queue VerifyQueue
verifier *Verifier
reporter Reporter
Loop *sync2.Cycle
@ -42,16 +46,16 @@ type Worker struct {
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Reporter, config Config) (*Worker, error) {
func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reporter Reporter, config Config) *Worker {
return &Worker{
log: log,
queues: queues,
queue: queue,
verifier: verifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
concurrency: config.WorkerConcurrency,
}, nil
}
}
// Run runs audit service 2.0.
@ -78,22 +82,14 @@ func (worker *Worker) Close() error {
func (worker *Worker) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// get the current queue
queue := worker.queues.Fetch()
limiter := sync2.NewLimiter(worker.concurrency)
defer limiter.Wait()
for {
segment, err := queue.Next()
segment, err := worker.queue.Next(ctx)
if err != nil {
if ErrEmptyQueue.Has(err) {
// get a new queue and return if empty; otherwise continue working.
queue = worker.queues.Fetch()
if queue.Size() == 0 {
return nil
}
continue
return nil
}
return err
}
@ -130,6 +126,16 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
errlist.Add(err)
}
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
// no need to add this error; Verify() will encounter it again
// and will handle the verification job as appropriate.
err = nil
} else {
errlist.Add(err)
}
}
// Skip all reverified nodes in the next Verify step.
skip := make(map[storj.NodeID]bool)
for _, nodeID := range report.Successes {
@ -148,7 +154,7 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
skip[nodeID] = true
}
// Next, audit the the remaining nodes that are not in containment mode.
// Next, audit the remaining nodes that are not in containment mode.
report, err = worker.verifier.Verify(ctx, segment, skip)
if err != nil {
errlist.Add(err)

View File

@ -116,11 +116,11 @@ type Core struct {
}
Audit struct {
Queues *audit.Queues
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter audit.Reporter
}
ExpiredDeletion struct {
@ -396,7 +396,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
config := config.Audit
peer.Audit.Queues = audit.NewQueues()
peer.Audit.VerifyQueue = db.VerifyQueue()
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Metabase,
@ -416,8 +416,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
int32(config.MaxReverifyCount),
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.Queues,
peer.Audit.Worker = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.VerifyQueue,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
@ -435,7 +435,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.Queues,
peer.Audit.VerifyQueue,
peer.Metainfo.SegmentLoop,
config,
)

View File

@ -1347,8 +1347,6 @@ func TestRepairExpiredSegment(t *testing.T) {
// get encrypted path of segment with audit service
satellite.Audit.Chore.Loop.TriggerWait()
queue := satellite.Audit.Queues.Fetch()
require.EqualValues(t, queue.Size(), 1)
// Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx)
@ -1365,7 +1363,7 @@ func TestRepairExpiredSegment(t *testing.T) {
satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs()
// Verify that the segment is still in the queue
// Verify that the segment is not still in the queue
count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, count)
@ -2863,8 +2861,8 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := testSatellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
@ -2935,8 +2933,8 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := testSatellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{

View File

@ -84,6 +84,7 @@ var safelyPartitionableDBs = map[string]bool{
// cross-db queries.
"repairqueue": true,
"nodeevents": true,
"verifyqueue": true,
}
// Open creates instance of satellite.DB.

View File

@ -49,6 +49,12 @@
# how often to recheck an empty audit queue
# audit.queue-interval: 1h0m0s
# how long a single reverification job can take before it may be taken over by another worker
# audit.reverification-retry-interval: 6h0m0s
# number of workers to run reverify audits on pieces
# audit.reverify-worker-concurrency: 2
# number of reservoir slots allotted for nodes, currently capped at 3
# audit.slots: 3