satellite/audit/queue: Separate audit queue into two separate structs.

* The audit worker wants to get items from the queue and process them.
* The audit chore wants to create new queues and swap them in when the
old queue has been processed.

This change adds a "Queues" struct which handles the concurrency
issues around the worker fetching a queue and the chore swapping a new
queue in. It simplifies the logic of the "Queue" struct to its bare
bones, so that it behaves like a normal queue with no need to understand
the details of swapping and worker/chore interactions.

Change-Id: Ic3689ede97a528e7590e98338cedddfa51794e1b
This commit is contained in:
Moby von Briesen 2020-08-20 09:29:02 -04:00 committed by Maximillian von Briesen
parent ba1a113e2e
commit 5d21e85529
10 changed files with 204 additions and 160 deletions

View File

@ -121,7 +121,7 @@ type Satellite struct {
Inspector *irreparable.Inspector
}
Audit struct {
Queue *audit.Queue
Queues *audit.Queues
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
@ -712,7 +712,7 @@ func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.C
system.Repair.Repairer = repairerPeer.Repairer
system.Repair.Inspector = api.Repair.Inspector
system.Audit.Queue = peer.Audit.Queue
system.Audit.Queues = peer.Audit.Queues
system.Audit.Worker = peer.Audit.Worker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier

View File

@ -19,22 +19,22 @@ import (
//
// architecture: Chore
type Chore struct {
log *zap.Logger
rand *rand.Rand
queue *Queue
Loop *sync2.Cycle
log *zap.Logger
rand *rand.Rand
queues *Queues
Loop *sync2.Cycle
metainfoLoop *metainfo.Loop
config Config
}
// NewChore instantiates Chore.
func NewChore(log *zap.Logger, queue *Queue, metaLoop *metainfo.Loop, config Config) *Chore {
func NewChore(log *zap.Logger, queues *Queues, metaLoop *metainfo.Loop, config Config) *Chore {
return &Chore{
log: log,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queue: queue,
Loop: sync2.NewCycle(config.ChoreInterval),
log: log,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queues: queues,
Loop: sync2.NewCycle(config.ChoreInterval),
metainfoLoop: metaLoop,
config: config,
@ -47,6 +47,12 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// If the previously pushed queue is still waiting to be swapped in, wait.
err = chore.queues.WaitForSwap(ctx)
if err != nil {
return err
}
pathCollector := NewPathCollector(chore.config.Slots, chore.rand)
err = chore.metainfoLoop.Join(ctx, pathCollector)
if err != nil {
@ -75,13 +81,8 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
}
// Wait for active queue to be completed, then swap with the new one.
err = chore.queue.WaitForSwap(ctx, newQueue)
if err != nil {
return err
}
return nil
// Push new queue to queues struct so it can be fetched by worker.
return chore.queues.Push(newQueue)
})
}

View File

@ -37,14 +37,15 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
}
audits.Chore.Loop.TriggerWait()
require.EqualValues(t, 2, audits.Queue.Size(), "audit queue")
queue := audits.Queues.Fetch()
require.EqualValues(t, 2, queue.Size(), "audit queue")
uniquePaths := make(map[storj.Path]struct{})
var err error
var path storj.Path
var pathCount int
for {
path, err = audits.Queue.Next()
path, err = queue.Next()
if err != nil {
break
}
@ -56,14 +57,14 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
}
require.True(t, audit.ErrEmptyQueue.Has(err))
require.Equal(t, 2, pathCount)
require.Equal(t, 0, audits.Queue.Size())
require.Equal(t, 0, queue.Size())
// Repopulate the queue for the worker.
audits.Chore.Loop.TriggerWait()
require.EqualValues(t, 2, audits.Queue.Size(), "audit queue")
// Make sure the worker processes all the items in the audit queue.
// Make sure the worker processes the audit queue.
audits.Worker.Loop.TriggerWait()
require.EqualValues(t, 0, audits.Queue.Size(), "audit queue")
queue = audits.Queues.Fetch()
require.EqualValues(t, 0, queue.Size(), "audit queue")
})
}

View File

@ -16,68 +16,22 @@ import (
var ErrEmptyQueue = errs.Class("empty audit queue")
// Queue is a list of paths to audit, shared between the reservoir chore and audit workers.
// It is not safe for concurrent use.
type Queue struct {
mu sync.Mutex
queue []storj.Path
// onEmpty is a callback used to swap the active and pending queues when the active queue is empty.
onEmpty func()
}
// WaitForSwap waits for the active queue to be empty, then replaces it with a new pending queue.
// DO NOT CALL AGAIN UNTIL PREVIOUS CALL HAS RETURNED - there should only ever be one routine that calls WaitForSwap.
// Otherwise, there is a possibility of one call getting stuck until the context is canceled.
func (q *Queue) WaitForSwap(ctx context.Context, newQueue []storj.Path) error {
q.mu.Lock()
if q.onEmpty != nil {
q.mu.Unlock()
panic("massive internal error, this shouldn't happen")
// NewQueue creates a new audit queue.
func NewQueue(paths []storj.Path) *Queue {
return &Queue{
queue: paths,
}
if len(q.queue) == 0 {
q.queue = newQueue
q.mu.Unlock()
return nil
}
onEmptyCalledChan := make(chan struct{})
cleanup := func() {
q.onEmpty = nil
close(onEmptyCalledChan)
}
// onEmpty assumes the mutex is locked when it is called.
q.onEmpty = func() {
q.queue = newQueue
cleanup()
}
q.mu.Unlock()
select {
case <-onEmptyCalledChan:
case <-ctx.Done():
q.mu.Lock()
defer q.mu.Unlock()
if q.onEmpty != nil {
cleanup()
}
}
return ctx.Err()
}
// Next gets the next item in the queue.
func (q *Queue) Next() (storj.Path, error) {
q.mu.Lock()
defer q.mu.Unlock()
// if the queue is empty, call onEmpty to swap queues (if there is a pending queue)
// otherwise, return empty queue error
if len(q.queue) == 0 {
if q.onEmpty != nil {
q.onEmpty()
}
if len(q.queue) == 0 {
return "", ErrEmptyQueue.New("")
}
return "", ErrEmptyQueue.New("")
}
next := q.queue[0]
@ -88,8 +42,92 @@ func (q *Queue) Next() (storj.Path, error) {
// Size returns the size of the queue.
func (q *Queue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.queue)
}
// ErrPendingQueueInProgress means that a chore attempted to add a new pending queue when one was already being added.
var ErrPendingQueueInProgress = errs.Class("pending queue already in progress")
// Queues is a shared resource that keeps track of the next queue to be fetched
// and swaps with a new queue when ready.
type Queues struct {
mu sync.Mutex
nextQueue *Queue
swapQueue func()
queueSwapped chan struct{}
}
// NewQueues creates a new Queues object.
func NewQueues() *Queues {
queues := &Queues{
nextQueue: NewQueue([]storj.Path{}),
}
return queues
}
// Fetch gets the active queue, clears it, and swaps a pending queue in as the new active queue if available.
func (queues *Queues) Fetch() *Queue {
queues.mu.Lock()
defer queues.mu.Unlock()
if queues.nextQueue.Size() == 0 && queues.swapQueue != nil {
queues.swapQueue()
}
active := queues.nextQueue
if queues.swapQueue != nil {
queues.swapQueue()
} else {
queues.nextQueue = NewQueue([]storj.Path{})
}
return active
}
// Push waits until the next queue has been fetched (if not empty), then swaps it with the provided pending queue.
// Push adds a pending queue to be swapped in when ready.
// If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched.
// Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.
func (queues *Queues) Push(pendingQueue []storj.Path) error {
queues.mu.Lock()
defer queues.mu.Unlock()
// do not allow multiple concurrent calls to Push().
// only one audit chore should exist.
if queues.swapQueue != nil {
return ErrPendingQueueInProgress.New("")
}
if queues.nextQueue.Size() == 0 {
queues.nextQueue = NewQueue(pendingQueue)
return nil
}
queues.queueSwapped = make(chan struct{})
queues.swapQueue = func() {
queues.nextQueue = NewQueue(pendingQueue)
queues.swapQueue = nil
close(queues.queueSwapped)
}
return nil
}
// WaitForSwap blocks until the swapQueue callback is called or context is canceled.
// If there is no pending swap, it returns immediately.
func (queues *Queues) WaitForSwap(ctx context.Context) error {
queues.mu.Lock()
if queues.swapQueue == nil {
queues.mu.Unlock()
return nil
}
queues.mu.Unlock()
// wait for swapQueue to be called or for context canceled
select {
case <-queues.queueSwapped:
case <-ctx.Done():
}
return ctx.Err()
}

View File

@ -6,7 +6,6 @@ package audit
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
@ -16,19 +15,23 @@ import (
"storj.io/common/testcontext"
)
func TestQueue(t *testing.T) {
func TestQueues(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
queues := NewQueues()
q := queues.Fetch()
_, err := q.Next()
require.True(t, ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
testQueue1 := []storj.Path{"a", "b", "c"}
err = q.WaitForSwap(ctx, testQueue1)
err = queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
require.NoError(t, err)
q = queues.Fetch()
for _, expected := range testQueue1 {
actual, err := q.Next()
require.NoError(t, err)
@ -38,41 +41,36 @@ func TestQueue(t *testing.T) {
require.Zero(t, q.Size())
}
func TestQueueWaitForSwap(t *testing.T) {
func TestQueuesPush(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
// when queue is empty, WaitForSwap should return immediately
queues := NewQueues()
// when next queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
err := q.WaitForSwap(ctx, testQueue1)
err := queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
require.NoError(t, err)
// second call to WaitForSwap should block until Fetch is called the first time
testQueue2 := []storj.Path{"d", "e"}
err = queues.Push(testQueue2)
require.NoError(t, err)
var group errgroup.Group
group.Go(func() error {
return q.WaitForSwap(ctx, testQueue2)
return queues.WaitForSwap(ctx)
})
// wait for WaitForSwap to set onEmpty callback so we can test that consuming the queue frees it.
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
q.mu.Lock()
if q.onEmpty != nil {
q.mu.Unlock()
break
}
q.mu.Unlock()
}
ticker.Stop()
q := queues.Fetch()
for _, expected := range testQueue1 {
actual, err := q.Next()
require.NoError(t, err)
require.EqualValues(t, expected, actual)
}
// next call to Next() should swap queues and free WaitForSwap
// second call to Fetch should return testQueue2
q = queues.Fetch()
item, err := q.Next()
require.NoError(t, err)
require.EqualValues(t, testQueue2[0], item)
@ -82,36 +80,32 @@ func TestQueueWaitForSwap(t *testing.T) {
require.NoError(t, err)
}
func TestQueueWaitForSwapCancel(t *testing.T) {
func TestQueuesPushCancel(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
queues := NewQueues()
// when queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
err := q.WaitForSwap(ctx, testQueue1)
err := queues.Push(testQueue1)
require.NoError(t, err)
err = queues.WaitForSwap(ctx)
require.NoError(t, err)
ctxWithCancel, cancel := context.WithCancel(ctx)
testQueue2 := []storj.Path{"d", "e"}
err = queues.Push(testQueue2)
require.NoError(t, err)
var group errgroup.Group
group.Go(func() error {
err = q.WaitForSwap(ctxWithCancel, testQueue2)
err = queues.WaitForSwap(ctxWithCancel)
require.True(t, errs2.IsCanceled(err))
return nil
})
// wait for WaitForSwap to set onEmpty callback so we can test that canceling the context frees it.
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
q.mu.Lock()
if q.onEmpty != nil {
q.mu.Unlock()
break
}
q.mu.Unlock()
}
ticker.Stop()
// make sure a concurrent call to Push fails
err = queues.Push(testQueue2)
require.True(t, ErrPendingQueueInProgress.Has(err))
cancel()

View File

@ -43,7 +43,6 @@ func TestReverifySuccess(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -55,6 +54,7 @@ func TestReverifySuccess(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -124,8 +124,6 @@ func TestReverifyFailMissingShare(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -136,6 +134,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -207,8 +206,6 @@ func TestReverifyFailMissingShareNotVerified(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -219,6 +216,7 @@ func TestReverifyFailMissingShareNotVerified(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -297,7 +295,6 @@ func TestReverifyFailBadData(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -309,6 +306,7 @@ func TestReverifyFailBadData(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -369,7 +367,6 @@ func TestReverifyOffline(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -381,6 +378,7 @@ func TestReverifyOffline(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -439,7 +437,6 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -451,6 +448,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -536,7 +534,6 @@ func TestReverifyDeletedSegment(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -547,6 +544,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -590,6 +588,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path, err = queue.Next()
require.NoError(t, err)
@ -621,7 +620,6 @@ func TestReverifyModifiedSegment(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
@ -633,6 +631,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
pendingPath, err := queue.Next()
require.NoError(t, err)
@ -672,6 +671,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
// select the encrypted path that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
@ -709,7 +709,6 @@ func TestReverifyReplacedSegment(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -720,6 +719,7 @@ func TestReverifyReplacedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
pendingPath, err := queue.Next()
require.NoError(t, err)
@ -756,6 +756,7 @@ func TestReverifyReplacedSegment(t *testing.T) {
// select the encrypted path that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
queue = audits.Queues.Fetch()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
@ -797,7 +798,6 @@ func TestReverifyDifferentShare(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -813,6 +813,7 @@ func TestReverifyDifferentShare(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
@ -905,7 +906,6 @@ func TestReverifyExpired1(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -917,6 +917,7 @@ func TestReverifyExpired1(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -962,7 +963,6 @@ func TestReverifyExpired2(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -978,6 +978,7 @@ func TestReverifyExpired2(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
@ -1094,7 +1095,6 @@ func TestReverifySlowDownload(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -1105,6 +1105,7 @@ func TestReverifySlowDownload(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -1178,7 +1179,6 @@ func TestReverifyUnknownError(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -1189,6 +1189,7 @@ func TestReverifyUnknownError(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)

View File

@ -40,7 +40,6 @@ func TestDownloadSharesHappyPath(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -54,6 +53,7 @@ func TestDownloadSharesHappyPath(t *testing.T) {
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -91,7 +91,6 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -105,6 +104,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -150,7 +150,6 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -162,6 +161,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -205,7 +205,6 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -217,6 +216,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -286,7 +286,6 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -300,6 +299,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -348,7 +348,6 @@ func TestVerifierHappyPath(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -360,6 +359,7 @@ func TestVerifierHappyPath(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -382,7 +382,6 @@ func TestVerifierExpired(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -394,6 +393,7 @@ func TestVerifierExpired(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -434,7 +434,6 @@ func TestVerifierOfflineNode(t *testing.T) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -446,6 +445,7 @@ func TestVerifierOfflineNode(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -473,7 +473,6 @@ func TestVerifierMissingPiece(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -485,6 +484,7 @@ func TestVerifierMissingPiece(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -518,7 +518,6 @@ func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -530,6 +529,7 @@ func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -568,7 +568,6 @@ func TestVerifierDialTimeout(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -580,6 +579,7 @@ func TestVerifierDialTimeout(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -625,7 +625,6 @@ func TestVerifierDeletedSegment(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -637,6 +636,7 @@ func TestVerifierDeletedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -657,7 +657,6 @@ func TestVerifierModifiedSegment(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
@ -670,6 +669,7 @@ func TestVerifierModifiedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -695,7 +695,6 @@ func TestVerifierReplacedSegment(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -707,6 +706,7 @@ func TestVerifierReplacedSegment(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -729,7 +729,6 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -741,6 +740,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -789,7 +789,6 @@ func TestVerifierSlowDownload(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -801,6 +800,7 @@ func TestVerifierSlowDownload(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
@ -839,7 +839,6 @@ func TestVerifierUnknownError(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
@ -851,6 +850,7 @@ func TestVerifierUnknownError(t *testing.T) {
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)

View File

@ -34,7 +34,7 @@ type Config struct {
// Worker contains information for populating audit queue and processing audits.
type Worker struct {
log *zap.Logger
queue *Queue
queues *Queues
verifier *Verifier
reporter *Reporter
Loop *sync2.Cycle
@ -42,11 +42,11 @@ type Worker struct {
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, queue *Queue, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
return &Worker{
log: log,
queue: queue,
queues: queues,
verifier: verifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
@ -81,12 +81,20 @@ func (worker *Worker) Close() error {
func (worker *Worker) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// get the current queue
queue := worker.queues.Fetch()
worker.limiter.Wait()
for {
path, err := worker.queue.Next()
path, err := queue.Next()
if err != nil {
if ErrEmptyQueue.Has(err) {
return nil
// get a new queue and return if empty; otherwise continue working.
queue = worker.queues.Fetch()
if queue.Size() == 0 {
return nil
}
continue
}
return err
}

View File

@ -95,7 +95,7 @@ type Core struct {
Checker *checker.Checker
}
Audit struct {
Queue *audit.Queue
Queues *audit.Queues
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
@ -315,7 +315,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup audit
config := config.Audit
peer.Audit.Queue = &audit.Queue{}
peer.Audit.Queues = audit.NewQueues()
peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Service,
@ -336,7 +336,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
)
peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.Queue,
peer.Audit.Queues,
peer.Audit.Verifier,
peer.Audit.Reporter,
config,
@ -354,7 +354,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.Queue,
peer.Audit.Queues,
peer.Metainfo.Loop,
config,
)

View File

@ -462,8 +462,9 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
// get encrypted path of segment with audit service
satellite.Audit.Chore.Loop.TriggerWait()
require.EqualValues(t, satellite.Audit.Queue.Size(), 1)
encryptedPath, err := satellite.Audit.Queue.Next()
queue := satellite.Audit.Queues.Fetch()
require.EqualValues(t, queue.Size(), 1)
encryptedPath, err := queue.Next()
require.NoError(t, err)
// replace pointer with one that is already expired
pointer.ExpirationDate = time.Now().Add(-time.Hour)