satellite/{overlay,satellitedb}: always show node's real online score
Previously if a node did not have audit history data for each of the windows over the tracking period, we would give them the benefit of the doubt and set their score to 1. This was to prevent nodes from being suspended right out the gate. We need a minimum amount of data to evaluate them. However, a node who is actually failing at being online will have no idea until they have received enough audits and we suspend them. Instead, we will always use their real score, but use a flag to determine whether they are eligible for suspension/dq. Change-Id: I382218f12e8770f95d4bcddcf101ef348940cadf
This commit is contained in:
parent
0d7ae8b182
commit
b39a99bae6
@ -67,8 +67,8 @@ type DB interface {
|
||||
// UpdateCheckIn updates a single storagenode's check-in stats.
|
||||
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error)
|
||||
|
||||
// UpdateAuditHistory updates a node's audit history with an online or offline audit and returns the online score for the tracking period.
|
||||
UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config AuditHistoryConfig) (onlineScore float64, err error)
|
||||
// UpdateAuditHistory updates a node's audit history with an online or offline audit.
|
||||
UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config AuditHistoryConfig) (auditHistory *pb.AuditHistory, err error)
|
||||
|
||||
// AllPieceCounts returns a map of node IDs to piece counts from the db.
|
||||
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int, err error)
|
||||
|
@ -340,8 +340,56 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
require.Nil(t, node.Disqualified)
|
||||
require.EqualValues(t, 1, node.Reputation.OnlineScore)
|
||||
|
||||
updateReq := &overlay.UpdateRequest{
|
||||
NodeID: nodeID,
|
||||
AuditOutcome: overlay.AuditSuccess,
|
||||
IsUp: false,
|
||||
AuditHistory: overlay.AuditHistoryConfig{
|
||||
WindowSize: time.Hour,
|
||||
TrackingPeriod: 2 * time.Hour,
|
||||
GracePeriod: time.Hour,
|
||||
OfflineThreshold: 0.6,
|
||||
},
|
||||
|
||||
AuditLambda: 0.95,
|
||||
AuditWeight: 1,
|
||||
AuditDQ: 0.6,
|
||||
SuspensionGracePeriod: time.Hour,
|
||||
SuspensionDQEnabled: true,
|
||||
AuditsRequiredForVetting: 0,
|
||||
UptimesRequiredForVetting: 0,
|
||||
}
|
||||
|
||||
// give node an offline audit
|
||||
// node's score is still 1 until its first window is complete
|
||||
nextWindowTime := time.Now()
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 0.5, time.Hour, nextWindowTime, oc)
|
||||
_, err = oc.UpdateStats(ctx, updateReq, nextWindowTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, node.OfflineSuspended)
|
||||
require.Nil(t, node.OfflineUnderReview)
|
||||
require.Nil(t, node.Disqualified)
|
||||
require.EqualValues(t, 1, node.Reputation.OnlineScore)
|
||||
|
||||
nextWindowTime = nextWindowTime.Add(updateReq.AuditHistory.WindowSize)
|
||||
|
||||
// node now has one full window, so its score should be 0
|
||||
// should not be suspended or DQ since it only has 1 window out of 2 for tracking period
|
||||
_, err = oc.UpdateStats(ctx, updateReq, nextWindowTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, node.OfflineSuspended)
|
||||
require.Nil(t, node.OfflineUnderReview)
|
||||
require.Nil(t, node.Disqualified)
|
||||
require.EqualValues(t, 0, node.Reputation.OnlineScore)
|
||||
|
||||
nextWindowTime = nextWindowTime.Add(updateReq.AuditHistory.WindowSize)
|
||||
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 0.5, time.Hour, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
// node should be offline suspended and under review
|
||||
@ -353,7 +401,7 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
require.EqualValues(t, 0.5, node.Reputation.OnlineScore)
|
||||
|
||||
// set online score to be good, but use a long grace period so that node remains under review
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 1, 100*time.Hour, nextWindowTime, oc)
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 1, 100*time.Hour, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
@ -365,7 +413,7 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
require.EqualValues(t, 1, node.Reputation.OnlineScore)
|
||||
|
||||
// suspend again, under review should be the same
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 0.5, 100*time.Hour, nextWindowTime, oc)
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 0.5, 100*time.Hour, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
@ -379,7 +427,7 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
// node will exit review after grace period + 1 tracking window, so set grace period to be time since put under review
|
||||
// subtract one hour so that review window ends when setOnlineScore adds the last window
|
||||
gracePeriod := nextWindowTime.Sub(*node.OfflineUnderReview) - time.Hour
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 1, gracePeriod, nextWindowTime, oc)
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 1, gracePeriod, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
@ -390,7 +438,7 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
require.EqualValues(t, 1, node.Reputation.OnlineScore)
|
||||
|
||||
// put into suspension and under review again
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 0.5, 100*time.Hour, nextWindowTime, oc)
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 0.5, 100*time.Hour, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
@ -401,7 +449,7 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
require.EqualValues(t, 0.5, node.Reputation.OnlineScore)
|
||||
|
||||
// if grace period + 1 tracking window passes and online score is still bad, expect node to be DQed
|
||||
nextWindowTime, err = setOnlineScore(ctx, nodeID, 0.5, 0, nextWindowTime, oc)
|
||||
nextWindowTime, err = setOnlineScore(ctx, updateReq, 0.5, 0, nextWindowTime, oc)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = oc.Get(ctx, nodeID)
|
||||
@ -420,15 +468,13 @@ func TestOfflineSuspend(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func setOnlineScore(ctx context.Context, id storj.NodeID, desiredScore float64, gracePeriod time.Duration, startTime time.Time, oc overlay.DB) (nextWindowTime time.Time, err error) {
|
||||
func setOnlineScore(ctx context.Context, reqPtr *overlay.UpdateRequest, desiredScore float64, gracePeriod time.Duration, startTime time.Time, oc overlay.DB) (nextWindowTime time.Time, err error) {
|
||||
// for our tests, we are only using values of 1 and 0.5, so two audits per window is sufficient
|
||||
totalAudits := 2
|
||||
onlineAudits := int(float64(totalAudits) * desiredScore)
|
||||
nextWindowTime = startTime
|
||||
|
||||
windowSize := time.Hour
|
||||
trackingPeriod := 2 * time.Hour
|
||||
windowsPerTrackingPeriod := 2
|
||||
windowsPerTrackingPeriod := int(reqPtr.AuditHistory.TrackingPeriod.Seconds() / reqPtr.AuditHistory.WindowSize.Seconds())
|
||||
for window := 0; window < windowsPerTrackingPeriod+1; window++ {
|
||||
updateReqs := []*overlay.UpdateRequest{}
|
||||
for i := 0; i < totalAudits; i++ {
|
||||
@ -436,34 +482,18 @@ func setOnlineScore(ctx context.Context, id storj.NodeID, desiredScore float64,
|
||||
if i >= onlineAudits {
|
||||
isUp = false
|
||||
}
|
||||
updateReq := &overlay.UpdateRequest{
|
||||
NodeID: id,
|
||||
AuditOutcome: overlay.AuditSuccess,
|
||||
IsUp: isUp,
|
||||
AuditHistory: overlay.AuditHistoryConfig{
|
||||
WindowSize: windowSize,
|
||||
TrackingPeriod: trackingPeriod,
|
||||
GracePeriod: gracePeriod,
|
||||
OfflineThreshold: 0.6,
|
||||
},
|
||||
updateReq := *reqPtr
|
||||
updateReq.IsUp = isUp
|
||||
updateReq.AuditHistory.GracePeriod = gracePeriod
|
||||
|
||||
// default values
|
||||
AuditLambda: 0.95,
|
||||
AuditWeight: 1,
|
||||
AuditDQ: 0.6,
|
||||
SuspensionGracePeriod: time.Hour,
|
||||
SuspensionDQEnabled: true,
|
||||
AuditsRequiredForVetting: 0,
|
||||
UptimesRequiredForVetting: 0,
|
||||
}
|
||||
updateReqs = append(updateReqs, updateReq)
|
||||
updateReqs = append(updateReqs, &updateReq)
|
||||
}
|
||||
_, err = oc.BatchUpdateStats(ctx, updateReqs, 100, nextWindowTime)
|
||||
if err != nil {
|
||||
return nextWindowTime, err
|
||||
}
|
||||
// increment nextWindowTime so in the next iteration, we are adding to a different window
|
||||
nextWindowTime = nextWindowTime.Add(time.Hour)
|
||||
nextWindowTime = nextWindowTime.Add(reqPtr.AuditHistory.WindowSize)
|
||||
}
|
||||
return nextWindowTime, err
|
||||
}
|
||||
|
@ -16,21 +16,7 @@ import (
|
||||
"storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
type auditHistory pb.AuditHistory
|
||||
|
||||
// auditHistoryFromBytes deserializes a byte array to get an auditHistory struct.
|
||||
func auditHistoryFromBytes(b []byte) (*auditHistory, error) {
|
||||
a := &pb.AuditHistory{}
|
||||
err := pb.Unmarshal(b, a)
|
||||
return (*auditHistory)(a), err
|
||||
}
|
||||
|
||||
// bytes serializes an auditHistory struct into a byte slice.
|
||||
func (a *auditHistory) bytes() ([]byte, error) {
|
||||
return pb.Marshal((*pb.AuditHistory)(a))
|
||||
}
|
||||
|
||||
func (a *auditHistory) addAudit(auditTime time.Time, online bool, config overlay.AuditHistoryConfig) error {
|
||||
func addAudit(a *pb.AuditHistory, auditTime time.Time, online bool, config overlay.AuditHistoryConfig) error {
|
||||
newAuditWindowStartTime := auditTime.Truncate(config.WindowSize)
|
||||
earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod)
|
||||
// windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed.
|
||||
@ -66,15 +52,13 @@ func (a *auditHistory) addAudit(auditTime time.Time, online bool, config overlay
|
||||
}
|
||||
a.Windows[latestIndex].TotalCount++
|
||||
|
||||
// if we do not have enough completed windows to fill a tracking period (exclude latest window), score should be 1
|
||||
windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
|
||||
if len(a.Windows)-1 < windowsPerTrackingPeriod {
|
||||
a.Score = 1
|
||||
// if no windows were added or removed, score does not change
|
||||
if !windowsModified {
|
||||
return nil
|
||||
}
|
||||
|
||||
// if no windows were added or removed, score does not change
|
||||
if !windowsModified {
|
||||
if len(a.Windows) <= 1 {
|
||||
a.Score = 1
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,32 +71,29 @@ func (a *auditHistory) addAudit(auditTime time.Time, online bool, config overlay
|
||||
totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount)
|
||||
}
|
||||
|
||||
if len(a.Windows) <= 1 {
|
||||
return Error.New("not enough windows to calculate score; this should not happen")
|
||||
}
|
||||
// divide by number of windows-1 because last window is not included
|
||||
a.Score = totalWindowScores / float64(len(a.Windows)-1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAuditHistory updates a node's audit history with an online or offline audit and returns the online score for the tracking period.
|
||||
func (cache *overlaycache) UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config overlay.AuditHistoryConfig) (onlineScore float64, err error) {
|
||||
// UpdateAuditHistory updates a node's audit history with an online or offline audit.
|
||||
func (cache *overlaycache) UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config overlay.AuditHistoryConfig) (history *pb.AuditHistory, err error) {
|
||||
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
|
||||
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
onlineScore, err = cache.updateAuditHistoryWithTx(ctx, tx, nodeID, auditTime, online, config)
|
||||
history, err = cache.updateAuditHistoryWithTx(ctx, tx, nodeID, auditTime, online, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return onlineScore, err
|
||||
return history, err
|
||||
}
|
||||
|
||||
func (cache *overlaycache) updateAuditHistoryWithTx(ctx context.Context, tx *dbx.Tx, nodeID storj.NodeID, auditTime time.Time, online bool, config overlay.AuditHistoryConfig) (onlineScore float64, err error) {
|
||||
func (cache *overlaycache) updateAuditHistoryWithTx(ctx context.Context, tx *dbx.Tx, nodeID storj.NodeID, auditTime time.Time, online bool, config overlay.AuditHistoryConfig) (*pb.AuditHistory, error) {
|
||||
// get and deserialize node audit history
|
||||
historyBytes := []byte{}
|
||||
newEntry := false
|
||||
@ -124,24 +105,25 @@ func (cache *overlaycache) updateAuditHistoryWithTx(ctx context.Context, tx *dbx
|
||||
// set flag to true so we know to create rather than update later
|
||||
newEntry = true
|
||||
} else if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
return nil, Error.Wrap(err)
|
||||
} else {
|
||||
historyBytes = dbAuditHistory.History
|
||||
}
|
||||
|
||||
history, err := auditHistoryFromBytes(historyBytes)
|
||||
history := &pb.AuditHistory{}
|
||||
err = pb.Unmarshal(historyBytes, history)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return history, err
|
||||
}
|
||||
|
||||
err = history.addAudit(auditTime, online, config)
|
||||
err = addAudit(history, auditTime, online, config)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return history, err
|
||||
}
|
||||
|
||||
historyBytes, err = history.bytes()
|
||||
historyBytes, err = pb.Marshal(history)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return history, err
|
||||
}
|
||||
|
||||
// if the entry did not exist at the beginning, create a new one. Otherwise update
|
||||
@ -151,7 +133,7 @@ func (cache *overlaycache) updateAuditHistoryWithTx(ctx context.Context, tx *dbx
|
||||
dbx.AuditHistory_NodeId(nodeID.Bytes()),
|
||||
dbx.AuditHistory_History(historyBytes),
|
||||
)
|
||||
return history.Score, Error.Wrap(err)
|
||||
return history, Error.Wrap(err)
|
||||
}
|
||||
|
||||
_, err = tx.Update_AuditHistory_By_NodeId(
|
||||
@ -162,5 +144,5 @@ func (cache *overlaycache) updateAuditHistoryWithTx(ctx context.Context, tx *dbx
|
||||
},
|
||||
)
|
||||
|
||||
return history.Score, Error.Wrap(err)
|
||||
return history, Error.Wrap(err)
|
||||
}
|
||||
|
@ -32,34 +32,43 @@ func TestAuditHistoryBasic(t *testing.T) {
|
||||
startingWindow := time.Now().Truncate(time.Hour)
|
||||
windowsInTrackingPeriod := int(auditHistoryConfig.TrackingPeriod.Seconds() / auditHistoryConfig.WindowSize.Seconds())
|
||||
currentWindow := startingWindow
|
||||
// we need windowsInTrackingPeriod+1 windows before we will see scores besides "1"
|
||||
// add enough windows to fill the tracking period, each with 1 online and 1 offline audit
|
||||
for i := 0; i < windowsInTrackingPeriod; i++ {
|
||||
score, err := cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(2*time.Minute), false, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, score)
|
||||
|
||||
score, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(20*time.Minute), true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, score)
|
||||
// online score should be 1 until the first window is finished
|
||||
history, err := cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(2*time.Minute), false, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, history.GetScore())
|
||||
|
||||
// move to next window
|
||||
currentWindow = currentWindow.Add(time.Hour)
|
||||
}
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(20*time.Minute), true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, history.GetScore())
|
||||
|
||||
// move to next window
|
||||
currentWindow = currentWindow.Add(time.Hour)
|
||||
|
||||
// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(2*time.Minute), false, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0.5, history.GetScore())
|
||||
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(20*time.Minute), true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0.5, history.GetScore())
|
||||
|
||||
// move to next window
|
||||
currentWindow = currentWindow.Add(time.Hour)
|
||||
|
||||
// try to add an audit for an old window, expect error
|
||||
_, err := cache.UpdateAuditHistory(ctx, node.ID(), startingWindow, true, auditHistoryConfig)
|
||||
_, err = cache.UpdateAuditHistory(ctx, node.ID(), startingWindow, true, auditHistoryConfig)
|
||||
require.Error(t, err)
|
||||
|
||||
// Add online audit for next window. Score should now be 0.5, since we have a tracking period full
|
||||
// of completed windows, each with 50% online audits.
|
||||
score, err := cache.UpdateAuditHistory(ctx, node.ID(), currentWindow, true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0.5, score)
|
||||
// add another online audit for the latest window; score should still be 0.5
|
||||
score, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(45*time.Minute), true, auditHistoryConfig)
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow, true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0.5, score)
|
||||
require.EqualValues(t, 0.5, history.GetScore())
|
||||
// add another online audit for the latest window; score should still be 0.5
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(45*time.Minute), true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0.5, history.GetScore())
|
||||
|
||||
currentWindow = currentWindow.Add(time.Hour)
|
||||
// in the current state, there are windowsInTrackingPeriod windows with a score of 0.5
|
||||
@ -67,8 +76,8 @@ func TestAuditHistoryBasic(t *testing.T) {
|
||||
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
|
||||
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
|
||||
// add online audit for next window; score should now be expectedScore
|
||||
score, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(time.Minute), true, auditHistoryConfig)
|
||||
history, err = cache.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(time.Minute), true, auditHistoryConfig)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, expectedScore, score)
|
||||
require.EqualValues(t, expectedScore, history.GetScore())
|
||||
})
|
||||
}
|
||||
|
@ -365,13 +365,13 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests
|
||||
continue
|
||||
}
|
||||
|
||||
onlineScore, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, now, updateReq.IsUp, updateReq.AuditHistory)
|
||||
auditHistory, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, now, updateReq.IsUp, updateReq.AuditHistory)
|
||||
if err != nil {
|
||||
doAppendAll = false
|
||||
return err
|
||||
}
|
||||
|
||||
updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq, onlineScore, now)
|
||||
updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq, auditHistory, now)
|
||||
|
||||
sql := buildUpdateStatement(updateNodeStats)
|
||||
|
||||
@ -443,12 +443,12 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
return nil
|
||||
}
|
||||
|
||||
onlineScore, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, now, updateReq.IsUp, updateReq.AuditHistory)
|
||||
auditHistory, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, now, updateReq.IsUp, updateReq.AuditHistory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updateFields := cache.populateUpdateFields(dbNode, updateReq, onlineScore, now)
|
||||
updateFields := cache.populateUpdateFields(dbNode, updateReq, auditHistory, now)
|
||||
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1191,7 +1191,7 @@ type updateNodeStats struct {
|
||||
OnlineScore float64Field
|
||||
}
|
||||
|
||||
func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditOnlineScore float64, now time.Time) updateNodeStats {
|
||||
func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditHistory *pb.AuditHistory, now time.Time) updateNodeStats {
|
||||
// there are three audit outcomes: success, failure, and unknown
|
||||
// if a node fails enough audits, it gets disqualified
|
||||
// if a node gets enough "unknown" audits, it gets put into suspension
|
||||
@ -1202,6 +1202,7 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
|
||||
unknownAuditBeta := dbNode.UnknownAuditReputationBeta
|
||||
totalAuditCount := dbNode.TotalAuditCount
|
||||
vettedAt := dbNode.VettedAt
|
||||
auditOnlineScore := auditHistory.Score
|
||||
|
||||
var updatedTotalAuditCount int64
|
||||
|
||||
@ -1324,15 +1325,24 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
|
||||
// Updating node stats always exits it from containment mode
|
||||
updateFields.Contained = boolField{set: true, value: false}
|
||||
|
||||
windowsPerTrackingPeriod := int(updateReq.AuditHistory.TrackingPeriod.Seconds() / updateReq.AuditHistory.WindowSize.Seconds())
|
||||
|
||||
// only penalize node if online score is below threshold and
|
||||
// if it has enough completed windows to fill a tracking period
|
||||
penalizeOfflineNode := false
|
||||
if auditOnlineScore < updateReq.AuditHistory.OfflineThreshold && len(auditHistory.Windows)-1 >= windowsPerTrackingPeriod {
|
||||
penalizeOfflineNode = true
|
||||
}
|
||||
|
||||
// always update online score
|
||||
updateFields.OnlineScore = float64Field{set: true, value: auditOnlineScore}
|
||||
|
||||
// Suspension and disqualification for offline nodes
|
||||
goodOnlineScore := auditOnlineScore >= updateReq.AuditHistory.OfflineThreshold
|
||||
if dbNode.UnderReview != nil {
|
||||
// move node in and out of suspension as needed during review period
|
||||
if goodOnlineScore && dbNode.OfflineSuspended != nil {
|
||||
if !penalizeOfflineNode && dbNode.OfflineSuspended != nil {
|
||||
updateFields.OfflineSuspended = timeField{set: true, isNil: true}
|
||||
} else if !goodOnlineScore && dbNode.OfflineSuspended == nil {
|
||||
} else if penalizeOfflineNode && dbNode.OfflineSuspended == nil {
|
||||
updateFields.OfflineSuspended = timeField{set: true, value: now}
|
||||
}
|
||||
|
||||
@ -1344,7 +1354,7 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
|
||||
// otherwise, disqualify node
|
||||
// TODO until disqualification is enabled, nodes will remain under review if their score is passed after the grace+tracking period
|
||||
if trackingPeriodPassed {
|
||||
if !goodOnlineScore {
|
||||
if penalizeOfflineNode {
|
||||
// TODO enable disqualification
|
||||
/*
|
||||
cache.db.log.Info("Disqualified", zap.String("DQ type", "node offline"), zap.String("Node ID", updateReq.NodeID.String()))
|
||||
@ -1356,7 +1366,7 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
|
||||
updateFields.OfflineSuspended = timeField{set: true, isNil: true}
|
||||
}
|
||||
}
|
||||
} else if !goodOnlineScore {
|
||||
} else if penalizeOfflineNode {
|
||||
// suspend node for being offline and begin review period
|
||||
updateFields.OfflineUnderReview = timeField{set: true, value: now}
|
||||
updateFields.OfflineSuspended = timeField{set: true, value: now}
|
||||
@ -1365,9 +1375,9 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
|
||||
return updateFields
|
||||
}
|
||||
|
||||
func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditOnlineScore float64, now time.Time) dbx.Node_Update_Fields {
|
||||
func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditHistory *pb.AuditHistory, now time.Time) dbx.Node_Update_Fields {
|
||||
|
||||
update := cache.populateUpdateNodeStats(dbNode, updateReq, auditOnlineScore, now)
|
||||
update := cache.populateUpdateNodeStats(dbNode, updateReq, auditHistory, now)
|
||||
updateFields := dbx.Node_Update_Fields{}
|
||||
if update.VettedAt.set {
|
||||
updateFields.VettedAt = dbx.Node_VettedAt(update.VettedAt.value)
|
||||
|
Loading…
Reference in New Issue
Block a user