satellite/audit/{queue,chore}: Wait for audit queue to be finished before swapping

* Do not swap the active audit queue with the pending audit queue until
the active audit queue is empty.
* Do not begin creating a new pending audit queue until the existing
pending audit queue has been swapped to the active queue.

Change-Id: I81db5bfa01458edb8cdbe71f5baeebdcb1b94317
This commit is contained in:
Moby von Briesen 2020-07-13 18:24:15 -04:00 committed by Maximillian von Briesen
parent c89746a383
commit 76030a8237
6 changed files with 183 additions and 26 deletions

View File

@ -74,7 +74,12 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
}
}
chore.queue.Swap(newQueue)
// Wait for active queue to be completed, then swap with the new one.
err = chore.queue.WaitForSwap(ctx, newQueue)
if err != nil {
return err
}
return nil
})

View File

@ -4,6 +4,7 @@
package audit
import (
"context"
"sync"
"github.com/zeebo/errs"
@ -18,14 +19,49 @@ var ErrEmptyQueue = errs.Class("empty audit queue")
type Queue struct {
mu sync.Mutex
queue []storj.Path
// onEmpty is a callback used to swap the active and pending queues when the active queue is empty.
onEmpty func()
}
// Swap switches the backing queue slice with a new queue slice.
func (q *Queue) Swap(newQueue []storj.Path) {
// WaitForSwap waits for the active queue to be empty, then replaces it with a new pending queue.
// DO NOT CALL AGAIN UNTIL PREVIOUS CALL HAS RETURNED - there should only ever be one routine that calls WaitForSwap.
// Otherwise, there is a possibility of one call getting stuck until the context is canceled.
func (q *Queue) WaitForSwap(ctx context.Context, newQueue []storj.Path) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.onEmpty != nil {
q.mu.Unlock()
panic("massive internal error, this shouldn't happen")
}
q.queue = newQueue
if len(q.queue) == 0 {
q.queue = newQueue
q.mu.Unlock()
return nil
}
onEmptyCalledChan := make(chan struct{})
cleanup := func() {
q.onEmpty = nil
close(onEmptyCalledChan)
}
// onEmpty assumes the mutex is locked when it is called.
q.onEmpty = func() {
q.queue = newQueue
cleanup()
}
q.mu.Unlock()
select {
case <-onEmptyCalledChan:
case <-ctx.Done():
q.mu.Lock()
defer q.mu.Unlock()
if q.onEmpty != nil {
cleanup()
}
}
return ctx.Err()
}
// Next gets the next item in the queue.
@ -33,9 +69,15 @@ func (q *Queue) Next() (storj.Path, error) {
q.mu.Lock()
defer q.mu.Unlock()
// return error if queue is empty
// if the queue is empty, call onEmpty to swap queues (if there is a pending queue)
// otherwise, return empty queue error
if len(q.queue) == 0 {
return "", ErrEmptyQueue.New("")
if q.onEmpty != nil {
q.onEmpty()
}
if len(q.queue) == 0 {
return "", ErrEmptyQueue.New("")
}
}
next := q.queue[0]

View File

@ -1,42 +1,120 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
package audit
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/storj"
"storj.io/storj/satellite/audit"
"storj.io/common/testcontext"
)
func TestQueue(t *testing.T) {
q := &audit.Queue{}
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
_, err := q.Next()
require.True(t, audit.ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
require.True(t, ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
testQueue1 := []storj.Path{"a", "b", "c"}
q.Swap(testQueue1)
path, err := q.Next()
err = q.WaitForSwap(ctx, testQueue1)
require.NoError(t, err)
require.EqualValues(t, testQueue1[0], path)
path, err = q.Next()
require.NoError(t, err)
require.EqualValues(t, testQueue1[1], path)
testQueue2 := []storj.Path{"0", "1", "2"}
q.Swap(testQueue2)
for _, expected := range testQueue2 {
path, err := q.Next()
for _, expected := range testQueue1 {
actual, err := q.Next()
require.NoError(t, err)
require.EqualValues(t, expected, path)
require.EqualValues(t, expected, actual)
}
_, err = q.Next()
require.True(t, audit.ErrEmptyQueue.Has(err), "required ErrEmptyQueue error")
require.Zero(t, q.Size())
}
func TestQueueWaitForSwap(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
// when queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
err := q.WaitForSwap(ctx, testQueue1)
require.NoError(t, err)
testQueue2 := []storj.Path{"d", "e"}
var group errgroup.Group
group.Go(func() error {
return q.WaitForSwap(ctx, testQueue2)
})
// wait for WaitForSwap to set onEmpty callback so we can test that consuming the queue frees it.
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
q.mu.Lock()
if q.onEmpty != nil {
q.mu.Unlock()
break
}
q.mu.Unlock()
}
ticker.Stop()
for _, expected := range testQueue1 {
actual, err := q.Next()
require.NoError(t, err)
require.EqualValues(t, expected, actual)
}
// next call to Next() should swap queues and free WaitForSwap
item, err := q.Next()
require.NoError(t, err)
require.EqualValues(t, testQueue2[0], item)
require.Equal(t, len(testQueue2)-1, q.Size())
err = group.Wait()
require.NoError(t, err)
}
func TestQueueWaitForSwapCancel(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := &Queue{}
// when queue is empty, WaitForSwap should return immediately
testQueue1 := []storj.Path{"a", "b", "c"}
err := q.WaitForSwap(ctx, testQueue1)
require.NoError(t, err)
ctxWithCancel, cancel := context.WithCancel(ctx)
testQueue2 := []storj.Path{"d", "e"}
var group errgroup.Group
group.Go(func() error {
err = q.WaitForSwap(ctxWithCancel, testQueue2)
require.True(t, errs2.IsCanceled(err))
return nil
})
// wait for WaitForSwap to set onEmpty callback so we can test that canceling the context frees it.
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
q.mu.Lock()
if q.onEmpty != nil {
q.mu.Unlock()
break
}
q.mu.Unlock()
}
ticker.Stop()
cancel()
err = group.Wait()
require.NoError(t, err)
}

View File

@ -45,6 +45,7 @@ func TestReverifySuccess(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -128,6 +129,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -213,6 +215,7 @@ func TestReverifyFailMissingShareNotVerified(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -305,6 +308,7 @@ func TestReverifyFailBadData(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -376,6 +380,7 @@ func TestReverifyOffline(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -445,6 +450,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -541,6 +547,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
@ -626,6 +633,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
@ -712,6 +720,7 @@ func TestReverifyReplacedSegment(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
@ -799,6 +808,7 @@ func TestReverifyDifferentShare(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
@ -909,6 +919,7 @@ func TestReverifyExpired1(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -965,6 +976,7 @@ func TestReverifyExpired2(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
@ -1099,6 +1111,7 @@ func TestReverifySlowDownload(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -1185,6 +1198,7 @@ func TestReverifyUnknownError(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)

View File

@ -42,6 +42,7 @@ func TestDownloadSharesHappyPath(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -95,6 +96,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -156,6 +158,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -213,6 +216,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -296,6 +300,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -360,6 +365,7 @@ func TestVerifierHappyPath(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -393,6 +399,7 @@ func TestVerifierExpired(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -444,6 +451,7 @@ func TestVerifierOfflineNode(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -482,6 +490,7 @@ func TestVerifierMissingPiece(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -526,6 +535,7 @@ func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -575,6 +585,7 @@ func TestVerifierDialTimeout(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -631,6 +642,7 @@ func TestVerifierDeletedSegment(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -662,6 +674,7 @@ func TestVerifierModifiedSegment(t *testing.T) {
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -697,6 +710,7 @@ func TestVerifierReplacedSegment(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -729,6 +743,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -788,6 +803,7 @@ func TestVerifierSlowDownload(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
@ -837,6 +853,7 @@ func TestVerifierUnknownError(t *testing.T) {
queue := audits.Queue
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)

View File

@ -422,6 +422,7 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
satellite.Audit.Chore.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()