satellite/satellitedb: only suspend node if not already suspended

Whenever the node's reputation is updated, if its unknown audit
reputation is below the suspension threshold, its suspension field
is set to the current time. This could overwrite the previous
"suspendedAt" value resulting a node that never reaches the end of
its suspension.

Also log whenever a node is disqualified or its suspension status
changes

Change-Id: I5e8c8f1c46f66d79cb279b5b16a84fe03f533deb
This commit is contained in:
Cameron Ayer 2020-04-08 18:28:25 -04:00 committed by jens
parent 4e2a101fcc
commit 02613407ae
2 changed files with 51 additions and 7 deletions

View File

@ -142,3 +142,38 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) {
require.Nil(t, node.Suspended)
})
}
func TestSuspensionTimeNotResetBySuccessiveAudit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
suspendedNode := planet.StorageNodes[0].ID()
failed, err := audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "")
require.NoError(t, err)
require.Zero(t, failed)
overlay := satellite.Overlay.Service
node, err := overlay.Get(ctx, suspendedNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.NotNil(t, node.Suspended)
suspendedAt := node.Suspended
failed, err = audits.Reporter.RecordAudits(ctx, audit.Report{Unknown: []storj.NodeID{suspendedNode}}, "")
require.NoError(t, err)
require.Zero(t, failed)
node, err = overlay.Get(ctx, suspendedNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.NotNil(t, node.Suspended)
require.Equal(t, suspendedAt, node.Suspended)
})
}

View File

@ -14,6 +14,7 @@ import (
"github.com/lib/pq"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
@ -340,7 +341,8 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests
continue
}
updateNodeStats := populateUpdateNodeStats(dbNode, updateReq)
updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq)
sql := buildUpdateStatement(updateNodeStats)
allSQL += sql
@ -403,7 +405,7 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
return nil
}
updateFields := populateUpdateFields(dbNode, updateReq)
updateFields := cache.populateUpdateFields(dbNode, updateReq)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
@ -1086,7 +1088,7 @@ type updateNodeStats struct {
Contained boolField
}
func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats {
func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats {
// there are three audit outcomes: success, failure, and unknown
// if a node fails enough audits, it gets disqualified
// if a node gets enough "unknown" audits, it gets put into suspension
@ -1164,15 +1166,22 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
auditRep := auditAlpha / (auditAlpha + auditBeta)
if auditRep <= updateReq.AuditDQ {
cache.db.log.Info("Disqualified", zap.String("Node ID", updateReq.NodeID.String()))
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
}
// if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node.
unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta)
if unknownAuditRep <= updateReq.AuditDQ {
updateFields.Suspended = timeField{set: true, value: time.Now().UTC()}
if dbNode.Suspended == nil {
cache.db.log.Info("Suspended", zap.String("Node ID", updateFields.NodeID.String()), zap.String("Category", "Unknown Audits"))
updateFields.Suspended = timeField{set: true, value: time.Now().UTC()}
}
} else {
updateFields.Suspended = timeField{set: true, isNil: true}
if dbNode.Suspended != nil {
cache.db.log.Info("Suspension lifted", zap.String("Category", "Unknown Audits"), zap.String("Node ID", updateFields.NodeID.String()))
updateFields.Suspended = timeField{set: true, isNil: true}
}
}
// TODO if node has been suspended for longer than threshold, and audit outcome is failure or unknown, disqualify node.
@ -1194,9 +1203,9 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
return updateFields
}
func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) dbx.Node_Update_Fields {
func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) dbx.Node_Update_Fields {
update := populateUpdateNodeStats(dbNode, updateReq)
update := cache.populateUpdateNodeStats(dbNode, updateReq)
updateFields := dbx.Node_Update_Fields{}
if update.TotalAuditCount.set {
updateFields.TotalAuditCount = dbx.Node_TotalAuditCount(update.TotalAuditCount.value)