satellite/audit: fix containment bug where nodes not removed
When a node gets enough timeouts, it is supposed to be removed from pending_audits and get an audit failure. We would give them a failure, but we missed the removal. This change fixes it. Change-Id: I2f7014e28d7d9b01a9d051f5bbb4f67c86c7b36b
This commit is contained in:
parent
70296c5050
commit
5a1a29a62e
@ -202,6 +202,11 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits
|
||||
if err != nil {
|
||||
errlist.Add(err)
|
||||
failed = append(failed, pendingAudit)
|
||||
} else {
|
||||
_, err = reporter.containment.Delete(ctx, pendingAudit.NodeID)
|
||||
if err != nil && !ErrContainedNotFound.Has(err) {
|
||||
errlist.Add(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1180,3 +1180,123 @@ func TestReverifyUnknownError(t *testing.T) {
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestMaxReverifyCount(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
||||
},
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
||||
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
||||
config.Audit.MinDownloadTimeout = 1 * time.Second
|
||||
},
|
||||
testplanet.ReconfigureRS(2, 2, 4, 4),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
queue := audits.Queues.Fetch()
|
||||
queueSegment, err := queue.Next()
|
||||
require.NoError(t, err)
|
||||
|
||||
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
||||
StreamID: queueSegment.StreamID,
|
||||
Position: queueSegment.Position,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
slowPiece := segment.Pieces[0]
|
||||
slowNode := slowPiece.StorageNode
|
||||
|
||||
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
||||
require.NoError(t, err)
|
||||
|
||||
orders := satellite.Orders.Service
|
||||
containment := satellite.DB.Containment()
|
||||
|
||||
shareSize := segment.Redundancy.ShareSize
|
||||
rootPieceID := segment.RootPieceID
|
||||
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number))
|
||||
require.NoError(t, err)
|
||||
|
||||
pending := &audit.PendingAudit{
|
||||
NodeID: slowNode,
|
||||
PieceID: rootPieceID,
|
||||
StripeIndex: randomIndex,
|
||||
ShareSize: shareSize,
|
||||
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
||||
ReverifyCount: 0,
|
||||
StreamID: queueSegment.StreamID,
|
||||
Position: queueSegment.Position,
|
||||
}
|
||||
|
||||
err = containment.IncrementPending(ctx, pending)
|
||||
require.NoError(t, err)
|
||||
|
||||
node := planet.FindNode(slowNode)
|
||||
slowNodeDB := node.DB.(*testblobs.SlowDB)
|
||||
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
||||
delay := 1 * time.Second
|
||||
slowNodeDB.SetLatency(delay)
|
||||
|
||||
oldRep, err := satellite.Reputation.Service.Get(ctx, slowNode)
|
||||
require.NoError(t, err)
|
||||
|
||||
// give node enough timeouts to reach max
|
||||
for i := 0; i < planet.Satellites[0].Config.Audit.MaxReverifyCount; i++ {
|
||||
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, report.Successes, 0)
|
||||
require.Len(t, report.Fails, 0)
|
||||
require.Len(t, report.Offlines, 0)
|
||||
require.Len(t, report.PendingAudits, 1)
|
||||
require.Len(t, report.Unknown, 0)
|
||||
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = containment.Get(ctx, slowNode)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// final timeout should trigger failure and removal from containment
|
||||
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, report.Successes, 0)
|
||||
require.Len(t, report.Fails, 0)
|
||||
require.Len(t, report.Offlines, 0)
|
||||
require.Len(t, report.PendingAudits, 1)
|
||||
require.Len(t, report.Unknown, 0)
|
||||
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = containment.Get(ctx, slowNode)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
|
||||
newRep, err := satellite.Reputation.Service.Get(ctx, slowNode)
|
||||
require.NoError(t, err)
|
||||
require.Less(t, oldRep.AuditReputationBeta, newRep.AuditReputationBeta)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user