diff --git a/satellite/audit/containment_test.go b/satellite/audit/containment_test.go index 8feaf3dd5..2b3b99503 100644 --- a/satellite/audit/containment_test.go +++ b/satellite/audit/containment_test.go @@ -6,7 +6,6 @@ package audit_test import ( "math/rand" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -75,7 +74,7 @@ func TestContainIncrementPendingEntryExists(t *testing.T) { assert.EqualValues(t, 0, pending.ReverifyCount) // after the job is selected for work, its ReverifyCount should be increased to 1 - job, err := planet.Satellites[0].DB.ReverifyQueue().GetNextJob(ctx, 10*time.Minute) + job, err := planet.Satellites[0].DB.ReverifyQueue().GetNextJob(ctx, 0) require.NoError(t, err) require.Equal(t, pending.Locator, job.Locator) assert.EqualValues(t, 1, job.ReverifyCount) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index e6087857b..89cb9c64f 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -16,6 +16,7 @@ import ( "storj.io/common/memory" "storj.io/common/peertls/tlsopts" "storj.io/common/rpc" + "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testblobs" @@ -820,7 +821,7 @@ func TestMaxReverifyCount(t *testing.T) { }) // give node enough timeouts to reach max - for i := 0; i < satellite.Config.Audit.MaxReverifyCount-1; i++ { + for i := 0; i < satellite.Config.Audit.MaxReverifyCount; i++ { // run the reverify worker; each loop should complete once there are // no more reverifications to do in the queue audits.ReverifyWorker.Loop.TriggerWait() @@ -844,3 +845,156 @@ func TestMaxReverifyCount(t *testing.T) { require.Less(t, oldRep.AuditReputationBeta, newRep.AuditReputationBeta) }) } + +func TestTimeDelayBeforeReverifies(t *testing.T) { + const ( + auditTimeout = time.Second + reverifyInterval = time.Second / 4 + ) + testWithChoreAndObserver(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { + return testblobs.NewSlowDB(log.Named("slowdb"), db), nil + }, + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + // These config values are chosen to force the slow node to time out without timing out on the three normal nodes + config.Audit.MinBytesPerSecond = 100 * memory.KiB + config.Audit.MinDownloadTimeout = auditTimeout + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + testplanet.ReconfigureRS(2, 2, 4, 4), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { + satellite := planet.Satellites[0] + audits := satellite.Audit + + audits.Worker.Loop.Pause() + pauseQueueing(satellite) + + ul := planet.Uplinks[0] + testData := testrand.Bytes(8 * memory.KiB) + + err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + err = runQueueingOnce(ctx, satellite) + require.NoError(t, err) + + queue := audits.VerifyQueue + queueSegment, err := queue.Next(ctx) + require.NoError(t, err) + + segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) + require.NoError(t, err) + + slowPiece := segment.Pieces[0] + slowNode := planet.FindNode(slowPiece.StorageNode) + slowNode.DB.(*testblobs.SlowDB).SetLatency(10 * auditTimeout) + + report, err := audits.Verifier.Verify(ctx, audit.Segment{ + StreamID: segment.StreamID, + Position: segment.Position, + }, nil) + require.NoError(t, err) + + approximateQueueTime := time.Now() + audits.Reporter.RecordAudits(ctx, report) + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + pendingJob, err := satellite.DB.Containment().Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.NotNil(t, pendingJob) + dbQueueTime := pendingJob.InsertedAt // note this is not necessarily comparable with times from time.Now() + + reverifyQueue := satellite.Audit.ReverifyQueue + + // To demonstrate that a Reverify won't happen until reverifyInterval has elapsed, we will + // call reverifyQueue.GetNextJob up to 10 times, evenly spaced within reverifyInterval, + // asserting that the reverification job is still there, unchanged, and that the node + // is still contained, until after reverifyInterval. + // + // Yes, this is unfortunately dependent on the system clock and on sleep()s. But I've tried + // to make it as independent of actual timing as I can. + const ( + numCallsTarget = 10 + callInterval = reverifyInterval / numCallsTarget + ) + + for { + // reverify queue won't let us get the job yet + nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval) + if err == nil { + // unless reverifyInterval has elapsed + if time.Since(approximateQueueTime) >= reverifyInterval { + // in which case, it's good to get this + require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob) + break + } + require.Failf(t, "Got no error", "only %s has elapsed. nextJob=%+v", time.Since(approximateQueueTime), nextJob) + } + require.Error(t, err) + require.True(t, audit.ErrEmptyQueue.Has(err), err) + require.Nil(t, nextJob) + + // reverification job is still in the queue, though + pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID()) + require.NoError(t, err) + require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob) + + // and the node is still contained + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + + // wait a bit + sync2.Sleep(ctx, callInterval) + require.NoError(t, ctx.Err()) + } + + // Now we need to demonstrate that a second Reverify won't happen until reverifyInterval + // has elapsed again. This code will be largely the same as the first time around. + + for { + // reverify queue won't let us get the job yet + nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval) + if err == nil { + // unless 2*reverifyInterval has elapsed + if time.Since(approximateQueueTime) >= 2*reverifyInterval { + // in which case, it's good to get this + require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob) + break + } + } + require.Error(t, err) + require.True(t, audit.ErrEmptyQueue.Has(err), err) + require.Nil(t, nextJob) + + // reverification job is still in the queue, though + pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID()) + require.NoError(t, err) + require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID) + require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob) + require.True(t, pendingJob.LastAttempt.After(dbQueueTime), pendingJob) + + // and the node is still contained + node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID()) + require.NoError(t, err) + require.True(t, node.Contained) + + // wait a bit + sync2.Sleep(ctx, callInterval) + require.NoError(t, ctx.Err()) + } + }) +} diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 02aef4726..f81aca6ca 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -1440,6 +1440,7 @@ func TestConcurrentAuditsTimeout(t *testing.T) { numConcurrentAudits = 10 minPieces = 5 slowNodes = minPieces / 2 + retryInterval = 5 * time.Minute ) testWithChoreAndObserver(t, testplanet.Config{ @@ -1508,6 +1509,11 @@ func TestConcurrentAuditsTimeout(t *testing.T) { err = group.Wait() require.NoError(t, err) + rq := audits.ReverifyQueue.(interface { + audit.ReverifyQueue + TestingFudgeUpdateTime(ctx context.Context, pendingAudit *audit.PieceLocator, updateTime time.Time) error + }) + for _, report := range reports { require.Len(t, report.Fails, 0) require.Len(t, report.Unknown, 0) @@ -1517,14 +1523,19 @@ func TestConcurrentAuditsTimeout(t *testing.T) { // apply the audit results, as the audit worker would have done audits.Reporter.RecordAudits(ctx, report) + + // fudge the insert time backward by retryInterval so the jobs will be available to GetNextJob + for _, pending := range report.PendingAudits { + err := rq.TestingFudgeUpdateTime(ctx, &pending.Locator, time.Now().Add(-retryInterval)) + require.NoError(t, err) + } } // the slow nodes should have been added to the reverify queue multiple times; // once for each timed-out piece fetch queuedReverifies := make([]*audit.ReverificationJob, 0, numConcurrentAudits*slowNodes) - dummyRetryInterval := 5 * time.Minute for { - job, err := audits.ReverifyQueue.GetNextJob(ctx, dummyRetryInterval) + job, err := audits.ReverifyQueue.GetNextJob(ctx, retryInterval) if err != nil { if audit.ErrEmptyQueue.Has(err) { break diff --git a/satellite/satellitedb/reverifyqueue.go b/satellite/satellitedb/reverifyqueue.go index 1551aa58d..ac66f62a3 100644 --- a/satellite/satellitedb/reverifyqueue.go +++ b/satellite/satellitedb/reverifyqueue.go @@ -56,7 +56,7 @@ func (rq *reverifyQueue) GetNextJob(ctx context.Context, retryInterval time.Dura WITH next_entry AS ( SELECT * FROM reverification_audits - WHERE last_attempt IS NULL OR last_attempt < (now() - '1 microsecond'::interval * $1::bigint) + WHERE COALESCE(last_attempt, inserted_at) < (now() - '1 microsecond'::interval * $1::bigint) ORDER BY inserted_at LIMIT 1 ) diff --git a/satellite/satellitedb/reverifyqueue_test.go b/satellite/satellitedb/reverifyqueue_test.go index 8b25376da..6bd48a857 100644 --- a/satellite/satellitedb/reverifyqueue_test.go +++ b/satellite/satellitedb/reverifyqueue_test.go @@ -40,6 +40,9 @@ const ( func TestReverifyQueue(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { reverifyQueue := db.ReverifyQueue() + reverifyQueueTest := reverifyQueue.(interface { + TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error + }) locator1 := randomLocator() locator2 := randomLocator() @@ -56,6 +59,13 @@ func TestReverifyQueue(t *testing.T) { checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID) + // pretend that retryInterval has elapsed for both jobs + err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval)) + require.NoError(t, err) + err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator2, time.Now().Add(-retryInterval)) + require.NoError(t, err) + + // fetch both jobs from the queue and expect the right contents job1, err := reverifyQueue.GetNextJob(ctx, retryInterval) require.NoError(t, err) require.Equal(t, *locator1, job1.Locator) @@ -74,9 +84,6 @@ func TestReverifyQueue(t *testing.T) { require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err) // pretend that ReverifyRetryInterval has elapsed - reverifyQueueTest := reverifyQueue.(interface { - TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error - }) err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval)) require.NoError(t, err)