From ab2e79355560c6ae452c2934e243a059bdfd2400 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Thu, 9 Feb 2023 17:03:20 -0600 Subject: [PATCH] satellite/audit: test delay before Reverify We are supposed to wait for some amount of time after a timed-out audit before retrying the audit on the contained node. We are also supposed to wait for some amount of time before subsequent retries, if they are necessary. The test added here tries to assure that those delays happen, as far as it is possible to assure that a delay will happen in computer code. The previous behavior of the system was, in fact, to carry out Reverifies as soon as a worker could retrieve the job from the reverification queue. That's not a very major problem, as subsequent retries do have a delay and the node does get several retries. Still, it was not ideal, and this test exposed that mismatch with expectations, so this commit includes a minor change to effect that pause between verify and the first reverify. Refs: https://github.com/storj/storj/issues/5499 Change-Id: I83bb79c166a458ba59a2db2d17c85eca43ca90f0 --- satellite/audit/containment_test.go | 3 +- satellite/audit/reverify_test.go | 156 +++++++++++++++++++- satellite/audit/verifier_test.go | 15 +- satellite/satellitedb/reverifyqueue.go | 2 +- satellite/satellitedb/reverifyqueue_test.go | 13 +- 5 files changed, 180 insertions(+), 9 deletions(-) diff --git a/satellite/audit/containment_test.go b/satellite/audit/containment_test.go index 8feaf3dd5..2b3b99503 100644 --- a/satellite/audit/containment_test.go +++ b/satellite/audit/containment_test.go @@ -6,7 +6,6 @@ package audit_test import ( "math/rand" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -75,7 +74,7 @@ func TestContainIncrementPendingEntryExists(t *testing.T) { assert.EqualValues(t, 0, pending.ReverifyCount) // after the job is selected for work, its ReverifyCount should be increased to 1 - job, err := planet.Satellites[0].DB.ReverifyQueue().GetNextJob(ctx, 10*time.Minute) + job, err := planet.Satellites[0].DB.ReverifyQueue().GetNextJob(ctx, 0) require.NoError(t, err) require.Equal(t, pending.Locator, job.Locator) assert.EqualValues(t, 1, job.ReverifyCount) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index e6087857b..89cb9c64f 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -16,6 +16,7 @@ import ( "storj.io/common/memory" "storj.io/common/peertls/tlsopts" "storj.io/common/rpc" + "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testblobs" @@ -820,7 +821,7 @@ func TestMaxReverifyCount(t *testing.T) { }) // give node enough timeouts to reach max - for i := 0; i < satellite.Config.Audit.MaxReverifyCount-1; i++ { + for i := 0; i < satellite.Config.Audit.MaxReverifyCount; i++ { // run the reverify worker; each loop should complete once there are // no more reverifications to do in the queue audits.ReverifyWorker.Loop.TriggerWait() @@ -844,3 +845,156 @@ func TestMaxReverifyCount(t *testing.T) { require.Less(t, oldRep.AuditReputationBeta, newRep.AuditReputationBeta) }) } + +func TestTimeDelayBeforeReverifies(t *testing.T) { + const ( + auditTimeout = time.Second + reverifyInterval = time.Second / 4 + ) + testWithChoreAndObserver(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { + return testblobs.NewSlowDB(log.Named("slowdb"), db), nil + }, + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + // These config values are chosen to force the slow node to time out without timing out on the three normal nodes + config.Audit.MinBytesPerSecond = 100 * memory.KiB + config.Audit.MinDownloadTimeout = auditTimeout + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + testplanet.ReconfigureRS(2, 2, 4, 4), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { + satellite := planet.Satellites[0] + audits := satellite.Audit + + audits.Worker.Loop.Pause() + pauseQueueing(satellite) + + ul := planet.Uplinks[0] + testData := testrand.Bytes(8 * memory.KiB) + + err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + err = runQueueingOnce(ctx, satellite) + require.NoError(t, err) + + queue := audits.VerifyQueue + queueSegment, err := queue.Next(ctx) + require.NoError(t, err) + + segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) + require.NoError(t, err) + + slowPiece := segment.Pieces[0] + slowNode := planet.FindNode(slowPiece.StorageNode) + slowNode.DB.(*testblobs.SlowDB).SetLatency(10 * auditTimeout) + + report, err := audits.Verifier.Verify(ctx, audit.Segment{ + StreamID: segment.StreamID, + Position: segment.Position, + }, nil) + require.NoError(t, err) + + approximateQueueTime := time.Now() + audits.Reporter.RecordAudits(ctx, report) + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + pendingJob, err := satellite.DB.Containment().Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.NotNil(t, pendingJob) + dbQueueTime := pendingJob.InsertedAt // note this is not necessarily comparable with times from time.Now() + + reverifyQueue := satellite.Audit.ReverifyQueue + + // To demonstrate that a Reverify won't happen until reverifyInterval has elapsed, we will + // call reverifyQueue.GetNextJob up to 10 times, evenly spaced within reverifyInterval, + // asserting that the reverification job is still there, unchanged, and that the node + // is still contained, until after reverifyInterval. + // + // Yes, this is unfortunately dependent on the system clock and on sleep()s. But I've tried + // to make it as independent of actual timing as I can. + const ( + numCallsTarget = 10 + callInterval = reverifyInterval / numCallsTarget + ) + + for { + // reverify queue won't let us get the job yet + nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval) + if err == nil { + // unless reverifyInterval has elapsed + if time.Since(approximateQueueTime) >= reverifyInterval { + // in which case, it's good to get this + require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob) + break + } + require.Failf(t, "Got no error", "only %s has elapsed. nextJob=%+v", time.Since(approximateQueueTime), nextJob) + } + require.Error(t, err) + require.True(t, audit.ErrEmptyQueue.Has(err), err) + require.Nil(t, nextJob) + + // reverification job is still in the queue, though + pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID()) + require.NoError(t, err) + require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob) + + // and the node is still contained + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + + // wait a bit + sync2.Sleep(ctx, callInterval) + require.NoError(t, ctx.Err()) + } + + // Now we need to demonstrate that a second Reverify won't happen until reverifyInterval + // has elapsed again. This code will be largely the same as the first time around. + + for { + // reverify queue won't let us get the job yet + nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval) + if err == nil { + // unless 2*reverifyInterval has elapsed + if time.Since(approximateQueueTime) >= 2*reverifyInterval { + // in which case, it's good to get this + require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob) + break + } + } + require.Error(t, err) + require.True(t, audit.ErrEmptyQueue.Has(err), err) + require.Nil(t, nextJob) + + // reverification job is still in the queue, though + pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID()) + require.NoError(t, err) + require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob) + require.True(t, pendingJob.LastAttempt.After(dbQueueTime), pendingJob) + + // and the node is still contained + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + + // wait a bit + sync2.Sleep(ctx, callInterval) + require.NoError(t, ctx.Err()) + } + }) +} diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 02aef4726..f81aca6ca 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -1440,6 +1440,7 @@ func TestConcurrentAuditsTimeout(t *testing.T) { numConcurrentAudits = 10 minPieces = 5 slowNodes = minPieces / 2 + retryInterval = 5 * time.Minute ) testWithChoreAndObserver(t, testplanet.Config{ @@ -1508,6 +1509,11 @@ func TestConcurrentAuditsTimeout(t *testing.T) { err = group.Wait() require.NoError(t, err) + rq := audits.ReverifyQueue.(interface { + audit.ReverifyQueue + TestingFudgeUpdateTime(ctx context.Context, pendingAudit *audit.PieceLocator, updateTime time.Time) error + }) + for _, report := range reports { require.Len(t, report.Fails, 0) require.Len(t, report.Unknown, 0) @@ -1517,14 +1523,19 @@ func TestConcurrentAuditsTimeout(t *testing.T) { // apply the audit results, as the audit worker would have done audits.Reporter.RecordAudits(ctx, report) + + // fudge the insert time backward by retryInterval so the jobs will be available to GetNextJob + for _, pending := range report.PendingAudits { + err := rq.TestingFudgeUpdateTime(ctx, &pending.Locator, time.Now().Add(-retryInterval)) + require.NoError(t, err) + } } // the slow nodes should have been added to the reverify queue multiple times; // once for each timed-out piece fetch queuedReverifies := make([]*audit.ReverificationJob, 0, numConcurrentAudits*slowNodes) - dummyRetryInterval := 5 * time.Minute for { - job, err := audits.ReverifyQueue.GetNextJob(ctx, dummyRetryInterval) + job, err := audits.ReverifyQueue.GetNextJob(ctx, retryInterval) if err != nil { if audit.ErrEmptyQueue.Has(err) { break diff --git a/satellite/satellitedb/reverifyqueue.go b/satellite/satellitedb/reverifyqueue.go index 1551aa58d..ac66f62a3 100644 --- a/satellite/satellitedb/reverifyqueue.go +++ b/satellite/satellitedb/reverifyqueue.go @@ -56,7 +56,7 @@ func (rq *reverifyQueue) GetNextJob(ctx context.Context, retryInterval time.Dura WITH next_entry AS ( SELECT * FROM reverification_audits - WHERE last_attempt IS NULL OR last_attempt < (now() - '1 microsecond'::interval * $1::bigint) + WHERE COALESCE(last_attempt, inserted_at) < (now() - '1 microsecond'::interval * $1::bigint) ORDER BY inserted_at LIMIT 1 ) diff --git a/satellite/satellitedb/reverifyqueue_test.go b/satellite/satellitedb/reverifyqueue_test.go index 8b25376da..6bd48a857 100644 --- a/satellite/satellitedb/reverifyqueue_test.go +++ b/satellite/satellitedb/reverifyqueue_test.go @@ -40,6 +40,9 @@ const ( func TestReverifyQueue(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { reverifyQueue := db.ReverifyQueue() + reverifyQueueTest := reverifyQueue.(interface { + TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error + }) locator1 := randomLocator() locator2 := randomLocator() @@ -56,6 +59,13 @@ func TestReverifyQueue(t *testing.T) { checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID) + // pretend that retryInterval has elapsed for both jobs + err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval)) + require.NoError(t, err) + err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator2, time.Now().Add(-retryInterval)) + require.NoError(t, err) + + // fetch both jobs from the queue and expect the right contents job1, err := reverifyQueue.GetNextJob(ctx, retryInterval) require.NoError(t, err) require.Equal(t, *locator1, job1.Locator) @@ -74,9 +84,6 @@ func TestReverifyQueue(t *testing.T) { require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err) // pretend that ReverifyRetryInterval has elapsed - reverifyQueueTest := reverifyQueue.(interface { - TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error - }) err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval)) require.NoError(t, err)