satellite/reputation: new ApplyUpdates() method

The ApplyUpdates() method on the reputation.DB interface acts like the
similar Update() method, but can allow for applying the changes from
multiple audit events, instead of only one.

This will be necessary for the reputation write cache, which will batch
up changes to each node's reputation in order to flush them
periodically.

Refs: https://github.com/storj/storj/issues/4601

Change-Id: I44cc47767ea2d9423166bb8fed080c8a11182041
This commit is contained in:
paul cannon 2022-05-07 12:43:32 -05:00 committed by Storj Robot
parent 62774768d9
commit 737d7c7dfc
9 changed files with 676 additions and 148 deletions

View File

@ -77,9 +77,9 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
require.NoError(t, err)
reputation := calcReputation(reputationInfo)
require.Truef(t, prevReputation >= reputation,
"(%d) expected reputation to remain or decrease (previous >= current): %f >= %f",
iterations, prevReputation, reputation,
require.LessOrEqual(t, reputation, prevReputation,
"(%d) expected reputation to remain or decrease (current <= previous)",
iterations,
)
if reputation <= auditDQCutOff || reputation == prevReputation {
@ -88,7 +88,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
iterations, auditDQCutOff, prevReputation, reputation,
)
require.True(t, time.Since(*reputationInfo.Disqualified) >= 0,
require.GreaterOrEqual(t, time.Since(*reputationInfo.Disqualified), time.Duration(0),
"Disqualified should be in the past",
)
@ -99,7 +99,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
prevReputation = reputation
}
require.True(t, iterations > 1, "the number of iterations must be at least 2")
require.Greater(t, iterations, 1, "the number of iterations must be at least 2")
})
}

View File

@ -94,3 +94,76 @@ func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, con
a.Score = totalWindowScores / float64(len(a.Windows)-1)
return nil
}
// MergeAuditHistories merges two audit histories into one, including all
// windows that are present in either input and summing counts for
// any windows that appear in _both_ inputs. Any windows that are now outside
// the tracking period will be trimmed.
//
// The history parameter will be mutated to include the windows passed as
// addHistory.
//
// Returns true if the number of windows in the new history is the maximum
// possible for the tracking config.
func MergeAuditHistories(history *pb.AuditHistory, addHistory []*pb.AuditWindow, config AuditHistoryConfig) (trackingPeriodFull bool) {
windows := history.Windows
for addIndex, windowIndex := 0, 0; addIndex < len(addHistory); {
switch {
case windowIndex == len(windows):
windows = append(windows, &pb.AuditWindow{
WindowStart: addHistory[addIndex].WindowStart,
})
fallthrough
case windows[windowIndex].WindowStart.Equal(addHistory[addIndex].WindowStart):
windows[windowIndex].TotalCount += addHistory[addIndex].TotalCount
windows[windowIndex].OnlineCount += addHistory[addIndex].OnlineCount
addIndex++
case windows[windowIndex].WindowStart.Before(addHistory[addIndex].WindowStart):
windowIndex++
case windows[windowIndex].WindowStart.After(addHistory[addIndex].WindowStart):
windows = append(windows[:windowIndex+1], windows[windowIndex:]...)
windows[windowIndex] = &pb.AuditWindow{
WindowStart: addHistory[addIndex].WindowStart,
TotalCount: addHistory[addIndex].TotalCount,
OnlineCount: addHistory[addIndex].OnlineCount,
}
addIndex++
}
}
// trim off windows that are too old
if len(windows) > 0 {
cutoffTime := windows[len(windows)-1].WindowStart.Add(-config.TrackingPeriod)
for len(windows) > 0 && windows[0].WindowStart.Before(cutoffTime) {
windows = windows[1:]
}
}
history.Windows = windows
RecalculateScore(history)
windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
trackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod
return trackingPeriodFull
}
// RecalculateScore calculates and assigns the Score field in a pb.AuditHistory object.
// The score is calculated by averaging the online percentage in each window
// (not including the last).
func RecalculateScore(history *pb.AuditHistory) {
if len(history.Windows) <= 1 {
history.Score = 1
return
}
totalWindowScores := float64(0)
for i, window := range history.Windows {
// do not include last window in score
if i+1 == len(history.Windows) {
break
}
totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount)
}
history.Score = totalWindowScores / float64(len(history.Windows)-1)
}

View File

@ -76,3 +76,226 @@ func TestAddAuditToHistory(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, expectedScore, history.Score)
}
func TestMergeAuditHistoriesWithSingleAudit(t *testing.T) {
config := reputation.AuditHistoryConfig{
WindowSize: time.Hour,
TrackingPeriod: 2 * time.Hour,
GracePeriod: time.Hour,
OfflineThreshold: 0.6,
OfflineDQEnabled: true,
OfflineSuspensionEnabled: true,
}
startingWindow := time.Now().Truncate(time.Hour)
windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
currentWindow := startingWindow
history := &pb.AuditHistory{}
// online score should be 1 until the first window is finished
trackingPeriodFull := testMergeAuditHistories(history, false, currentWindow.Add(2*time.Minute), config)
require.EqualValues(t, 1, history.Score)
require.False(t, trackingPeriodFull)
trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(20*time.Minute), config)
require.EqualValues(t, 1, history.Score)
require.False(t, trackingPeriodFull)
// move to next window
currentWindow = currentWindow.Add(time.Hour)
// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
trackingPeriodFull = testMergeAuditHistories(history, false, currentWindow.Add(2*time.Minute), config)
require.EqualValues(t, 0.5, history.Score)
require.False(t, trackingPeriodFull)
trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(20*time.Minute), config)
require.EqualValues(t, 0.5, history.Score)
require.False(t, trackingPeriodFull)
// move to next window
currentWindow = currentWindow.Add(time.Hour)
// add another online audit for the latest window; score should still be 0.5
trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow, config)
require.EqualValues(t, 0.5, history.Score)
// now that we have two full windows other than the current one, tracking period should be considered full.
require.True(t, trackingPeriodFull)
// add another online audit for the latest window; score should still be 0.5
trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(45*time.Minute), config)
require.EqualValues(t, 0.5, history.Score)
require.True(t, trackingPeriodFull)
currentWindow = currentWindow.Add(time.Hour)
// in the current state, there are windowsInTrackingPeriod windows with a score of 0.5
// and one window with a score of 1.0. The Math below calculates the new score when the latest
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
// add online audit for next window; score should now be expectedScore
trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(time.Minute), config)
require.EqualValues(t, expectedScore, history.Score)
require.True(t, trackingPeriodFull)
}
func testMergeAuditHistories(history *pb.AuditHistory, online bool, auditTime time.Time, config reputation.AuditHistoryConfig) bool {
onlineCount := int32(0)
if online {
onlineCount = 1
}
windows := []*pb.AuditWindow{{
WindowStart: auditTime.Truncate(config.WindowSize),
OnlineCount: onlineCount,
TotalCount: 1,
}}
return reputation.MergeAuditHistories(history, windows, config)
}
type hist struct {
online bool
startAt time.Time
}
func TestMergeAuditHistoriesWithMultipleAudits(t *testing.T) {
config := reputation.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: 1 * time.Hour,
}
startTime := time.Now().Truncate(time.Hour).Add(-time.Hour)
t.Parallel()
t.Run("normal-merge", func(t *testing.T) {
history := makeHistory([]hist{
// first window: half online
{true, startTime},
{false, startTime.Add(1 * time.Minute)},
{true, startTime.Add(5 * time.Minute)},
{false, startTime.Add(8 * time.Minute)},
// second window: all online
{true, startTime.Add(10 * time.Minute)},
{true, startTime.Add(11 * time.Minute)},
{true, startTime.Add(20*time.Minute - time.Second)},
// third window: all online
{true, startTime.Add(20 * time.Minute)},
// fourth window: all online
{true, startTime.Add(30 * time.Minute)},
// fifth window; won't be included in score
{false, startTime.Add(40 * time.Minute)},
}, config)
require.Equal(t, float64(0.875), history.Score) // 3.5/4; chosen to be exact in floating point
// make the second, third, and fourth windows go from all-online to half-online
addHistory := makeHistory([]hist{
// fits in second window
{false, startTime.Add(12 * time.Minute)},
{false, startTime.Add(13 * time.Minute)},
{false, startTime.Add(14 * time.Minute)},
// fits in third window
{false, startTime.Add(20*time.Minute + time.Microsecond)},
// fits in fourth window
{false, startTime.Add(40*time.Minute - time.Microsecond)},
}, config)
require.Equal(t, float64(0), addHistory.Score)
periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config)
require.False(t, periodFull)
require.Equal(t, 5, len(history.Windows))
require.Equal(t, float64(0.5), history.Score) // all windows at 50% online
})
t.Run("trim-old-windows", func(t *testing.T) {
history := makeHistory([]hist{
// this window is too old
{true, startTime.Add(-2 * time.Minute)},
{true, startTime.Add(-1 * time.Minute)},
// oldest window
{false, startTime.Add(0)},
// newest window (not included in score)
{true, startTime.Add(1 * time.Hour)},
}, config)
require.Equal(t, float64(0.5), history.Score) // the too-old window is still included in the score here
addHistory := makeHistory([]hist{
// this window is too old
{true, startTime.Add(-10 * time.Minute)},
// oldest window
{false, startTime.Add(9 * time.Minute)},
// a window entirely not present in the other history
{true, startTime.Add(10 * time.Minute)},
}, config)
require.Equal(t, float64(0.5), addHistory.Score) // the latest window is not included (yet)
periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config)
require.False(t, periodFull)
require.Equal(t, 3, len(history.Windows))
// oldest window = 0/2, second window = 1/1, third window not counted
require.Equal(t, float64(0.5), history.Score)
})
t.Run("merge-with-empty", func(t *testing.T) {
history := makeHistory([]hist{}, config)
require.Equal(t, float64(1), history.Score)
addHistory := makeHistory([]hist{
{true, startTime.Add(0)},
{false, startTime.Add(10 * time.Minute)},
{false, startTime.Add(59 * time.Minute)},
}, config)
require.Equal(t, float64(0.5), addHistory.Score)
periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config)
require.False(t, periodFull)
require.Equal(t, 3, len(history.Windows))
require.Equal(t, float64(0.5), history.Score)
// now merge with an empty addHistory instead
addHistory = makeHistory([]hist{}, config)
require.Equal(t, float64(1), addHistory.Score)
periodFull = reputation.MergeAuditHistories(history, addHistory.Windows, config)
require.False(t, periodFull)
require.Equal(t, 3, len(history.Windows))
require.Equal(t, float64(0.5), history.Score)
// and finally, merge two empty histories with each other
history = makeHistory([]hist{}, config)
addHistory = makeHistory([]hist{}, config)
periodFull = reputation.MergeAuditHistories(history, addHistory.Windows, config)
require.False(t, periodFull)
require.Equal(t, 0, len(history.Windows))
require.Equal(t, float64(1), history.Score)
})
}
func makeHistory(histWindows []hist, config reputation.AuditHistoryConfig) *pb.AuditHistory {
windows := make([]*pb.AuditWindow, 0, len(histWindows))
for _, histWindow := range histWindows {
onlineCount := int32(0)
if histWindow.online {
onlineCount = 1
}
startAt := histWindow.startAt.Truncate(config.WindowSize)
if len(windows) > 0 && startAt == windows[len(windows)-1].WindowStart {
windows[len(windows)-1].OnlineCount += onlineCount
windows[len(windows)-1].TotalCount++
} else {
windows = append(windows, &pb.AuditWindow{
OnlineCount: onlineCount,
TotalCount: 1,
WindowStart: startAt,
})
}
}
baseHistory := &pb.AuditHistory{
Windows: windows,
}
reputation.RecalculateScore(baseHistory)
return baseHistory
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation
import "math"
// UpdateReputation uses the Beta distribution model to determine a node's reputation.
// lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score.
// w is the normalization weight that affects how severely new updates affect the current reputation distribution.
func UpdateReputation(isSuccess bool, alpha, beta, lambda, w float64) (newAlpha, newBeta float64) {
// v is a single feedback value that allows us to update both alpha and beta
var v float64 = -1
if isSuccess {
v = 1
}
newAlpha = lambda*alpha + w*(1+v)/2
newBeta = lambda*beta + w*(1-v)/2
return newAlpha, newBeta
}
// UpdateReputationMultiple works like UpdateReputation, but applies multiple
// successive counts of an event type to the alpha and beta measures.
//
// With the arguments as named, applies 'count' successful audits. To apply negative
// audits, swap the alpha and beta parameters and return values.
//
//
// WARNING: GREEK LETTER MATH AHEAD
//
// Applying n successful audit results to an initial alpha value of α₀ gives a
// new α₁ value of:
//
// α₁ = λⁿα₀ + λⁿ⁻¹w + λⁿ⁻²w + ... + λ²w + λw + w
//
// The terms with w are the first n terms of a geometric series with coefficient
// w and common ratio λ. The closed form formula for the sum of those first n
// terms is (w(1-λⁿ) / (1-λ))
// (https://en.wikipedia.org/wiki/Geometric_series#Closed-form_formula).
// Adding the initial λⁿα₀ term, we get
//
// α₁ = λⁿα₀ + w(1-λⁿ) / (1-λ)
//
// The formula has the same structure for beta for n _failures_.
//
// β₁ = λⁿβ₀ + w(1-λⁿ) / (1-λ)
//
// For n _failures_,
//
// α₁ = λⁿα₀
//
// For n _successes_,
//
// β₁ = λⁿβ₀
//
func UpdateReputationMultiple(count int, alpha, beta, lambda, w float64) (newAlpha, newBeta float64) {
if lambda == 1 {
// special case: when the coefficient is 1, the closed-form formula is invalid
// (gives NaN because of a division by zero). Fortunately, the replacement
// formula in this case is even simpler.
newAlpha = alpha + w*float64(count)
newBeta = beta
} else {
lambdaPowN := math.Pow(lambda, float64(count))
newAlpha = lambdaPowN*alpha + w*(1-lambdaPowN)/(1-lambda)
newBeta = lambdaPowN * beta
}
return newAlpha, newBeta
}

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
@ -65,6 +66,133 @@ func TestUpdate(t *testing.T) {
})
}
// TestApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call
// is equivalent to making multiple separate Update() calls (modulo some details
// like exact-time-of-disqualification).
func TestApplyUpdatesEquivalentToMultipleUpdates(t *testing.T) {
t.Parallel()
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()
config := reputation.Config{
AuditLambda: 0.99,
AuditWeight: 1,
AuditDQ: 0.1,
SuspensionGracePeriod: 20 * time.Minute,
SuspensionDQEnabled: true,
AuditCount: 3,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: 1 * time.Hour,
GracePeriod: 20 * time.Minute,
OfflineThreshold: 0.5,
OfflineDQEnabled: false,
OfflineSuspensionEnabled: true,
},
}
for _, testDef := range []struct {
name string
failures int
successes int
offlines int
unknowns int
}{
{"4f-3s", 4, 3, 0, 0},
{"3s-3o", 0, 3, 3, 0},
{"4s-2u", 0, 4, 0, 2},
{"1f-4s-1o-3u", 1, 4, 1, 3},
{"4o", 4, 0, 0, 0},
{"5s", 0, 5, 0, 0},
{"6u", 0, 0, 0, 6},
} {
t.Run(testDef.name, func(t *testing.T) {
node1 := testrand.NodeID()
node2 := testrand.NodeID()
startTime := time.Now().Add(-time.Hour)
var (
info1, info2 *reputation.Info
err error
)
// Do the Update() calls first, on node1
updateReq := reputation.UpdateRequest{
NodeID: node1,
Config: config,
}
updateReq.AuditOutcome = reputation.AuditFailure
for i := 0; i < testDef.failures; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditOffline
for i := 0; i < testDef.offlines; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(10+i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditUnknown
for i := 0; i < testDef.unknowns; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(20+i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditSuccess
for i := 0; i < testDef.successes; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(30+i)*time.Minute))
require.NoError(t, err)
}
// Now do the single ApplyUpdates call, on node2
var hist pb.AuditHistory
for i := 0; i < testDef.failures; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.offlines; i++ {
err = reputation.AddAuditToHistory(&hist, false, startTime.Add(time.Duration(10+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.unknowns; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(20+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.successes; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(30+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
mutations := reputation.Mutations{
PositiveResults: testDef.successes,
FailureResults: testDef.failures,
UnknownResults: testDef.unknowns,
OfflineResults: testDef.offlines,
OnlineHistory: &hist,
}
info2, err = reputationDB.ApplyUpdates(ctx, node2, mutations, config, startTime.Add(40*time.Minute))
require.NoError(t, err)
require.NotNil(t, info1)
require.NotNil(t, info2)
require.Equalf(t, info1.VettedAt == nil, info2.VettedAt == nil,
"info1.VettedAt (%v) and info2.VettedAt (%v) should both be nil or both have values", info1.VettedAt, info2.VettedAt)
require.Equalf(t, info1.Disqualified == nil, info2.Disqualified == nil,
"info1.Disqualified (%v) and info2.Disqualified (%v) should both be nil or both have values", info1.Disqualified, info2.Disqualified)
require.InDelta(t, info1.AuditReputationAlpha, info2.AuditReputationAlpha, 1e-8)
require.InDelta(t, info1.AuditReputationBeta, info2.AuditReputationBeta, 1e-8)
require.InDelta(t, info1.UnknownAuditReputationAlpha, info2.UnknownAuditReputationAlpha, 1e-8)
require.InDelta(t, info1.UnknownAuditReputationBeta, info2.UnknownAuditReputationBeta, 1e-8)
require.InDelta(t, info1.OnlineScore, info2.OnlineScore, 1e-8)
require.NotNil(t, info1.AuditHistory)
require.NotNil(t, info2.AuditHistory)
require.Equal(t, info1.AuditHistory.Score, info2.AuditHistory.Score)
require.Equal(t, len(info1.AuditHistory.Windows), len(info2.AuditHistory.Windows),
"info1.AuditHistory.Windows (%v) and info2.AuditHistory.Windows (%v) should have the same length", info1.AuditHistory.Windows, info2.AuditHistory.Windows)
})
}
})
}
func TestDBDisqualifyNode(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()

View File

@ -18,6 +18,9 @@ import (
type DB interface {
Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error)
Get(ctx context.Context, nodeID storj.NodeID) (*Info, error)
// ApplyUpdates applies multiple updates (defined by the updates
// parameter) to a node's reputations record.
ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, reputationConfig Config, now time.Time) (_ *Info, err error)
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
@ -45,6 +48,18 @@ type Info struct {
UnknownAuditReputationBeta float64
}
// Mutations represents changes which should be made to a particular node's
// reputation, in terms of counts and/or timestamps of events which have
// occurred. A Mutations record can be applied to a reputations row without
// prior knowledge of that row's contents.
type Mutations struct {
PositiveResults int
FailureResults int
UnknownResults int
OfflineResults int
OnlineHistory *pb.AuditHistory
}
// Service handles storing node reputation data and updating
// the overlay cache when a node's status changes.
type Service struct {
@ -81,7 +96,7 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, rep
// only update node if its health status has changed, or it's a newly vetted
// node.
// this prevents the need to require caller of ApplyAudit() to always know
// the VettedAt time for a node.
// the previous VettedAt time for a node.
// Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison
// for the VettedAt status is done using time values that are truncated to second precision.
if hasReputationChanged(*statusUpdate, reputation, now) {
@ -187,3 +202,22 @@ func statusChanged(s1, s2 *time.Time) bool {
}
return true
}
// UpdateRequestToMutations transforms an UpdateRequest into the equivalent
// Mutations structure, which can be used with ApplyUpdates.
func UpdateRequestToMutations(updateReq UpdateRequest, now time.Time) (Mutations, error) {
updates := Mutations{}
switch updateReq.AuditOutcome {
case AuditSuccess:
updates.PositiveResults = 1
case AuditFailure:
updates.FailureResults = 1
case AuditUnknown:
updates.UnknownResults = 1
case AuditOffline:
updates.OfflineResults = 1
}
updates.OnlineHistory = &pb.AuditHistory{}
err := AddAuditToHistory(updates.OnlineHistory, updateReq.AuditOutcome != AuditOffline, now, updateReq.Config.AuditHistory)
return updates, err
}

View File

@ -99,8 +99,8 @@ func TestApplyAudit(t *testing.T) {
expectedAuditAlpha := config.AuditLambda*auditAlpha + config.AuditWeight
expectedAuditBeta := config.AuditLambda * auditBeta
require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha)
require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta)
require.InDelta(t, stats.AuditReputationAlpha, expectedAuditAlpha, 1e-8)
require.InDelta(t, stats.AuditReputationBeta, expectedAuditBeta, 1e-8)
auditAlpha = expectedAuditAlpha
auditBeta = expectedAuditBeta
@ -113,8 +113,8 @@ func TestApplyAudit(t *testing.T) {
expectedAuditAlpha = config.AuditLambda * auditAlpha
expectedAuditBeta = config.AuditLambda*auditBeta + config.AuditWeight
require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha)
require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta)
require.InDelta(t, stats.AuditReputationAlpha, expectedAuditAlpha, 1e-8)
require.InDelta(t, stats.AuditReputationBeta, expectedAuditBeta, 1e-8)
})
}
@ -131,8 +131,8 @@ func TestGet(t *testing.T) {
node, err := service.Get(ctx, nodeID)
require.NoError(t, err)
require.Zero(t, node.TotalAuditCount)
require.EqualValues(t, 1, node.AuditReputationAlpha)
require.EqualValues(t, 1, node.UnknownAuditReputationAlpha)
require.InDelta(t, 1, node.AuditReputationAlpha, 1e-8)
require.InDelta(t, 1, node.UnknownAuditReputationAlpha, 1e-8)
require.EqualValues(t, 1, node.OnlineScore)
// if a node has no entry in reputation store, it should have default
@ -140,8 +140,8 @@ func TestGet(t *testing.T) {
newNode, err := service.Get(ctx, testrand.NodeID())
require.NoError(t, err)
require.Zero(t, newNode.TotalAuditCount)
require.EqualValues(t, 1, newNode.AuditReputationAlpha)
require.EqualValues(t, 1, newNode.UnknownAuditReputationAlpha)
require.InDelta(t, 1, newNode.AuditReputationAlpha, 1e-8)
require.InDelta(t, 1, newNode.UnknownAuditReputationAlpha, 1e-8)
require.EqualValues(t, 1, newNode.OnlineScore)
})
}

View File

@ -5,39 +5,30 @@ package satellitedb
import (
"context"
"time"
"storj.io/common/pb"
"storj.io/storj/satellite/reputation"
)
func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputation.AuditHistoryConfig, online bool, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) {
func mergeAuditHistory(ctx context.Context, oldHistory []byte, addHistory []*pb.AuditWindow, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)
res = &reputation.UpdateAuditHistoryResponse{
NewScore: 1,
TrackingPeriodFull: false,
}
// deserialize node audit history
history := &pb.AuditHistory{}
err = pb.Unmarshal(oldHistory, history)
if err != nil {
return res, err
return nil, Error.Wrap(err)
}
err = reputation.AddAuditToHistory(history, online, auditTime, config)
trackingPeriodFull := reputation.MergeAuditHistories(history, addHistory, config)
historyBytes, err := pb.Marshal(history)
if err != nil {
return res, err
return nil, Error.Wrap(err)
}
res.History, err = pb.Marshal(history)
if err != nil {
return res, err
}
windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
res.TrackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod
res.NewScore = history.Score
return res, nil
return &reputation.UpdateAuditHistoryResponse{
NewScore: history.Score,
TrackingPeriodFull: trackingPeriodFull,
History: historyBytes,
}, nil
}

View File

@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/zeebo/errs"
@ -25,20 +26,28 @@ type reputations struct {
db *satelliteDB
}
// Update updates a node's reputation stats.
func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *reputation.Info, err error) {
mutations, err := reputation.UpdateRequestToMutations(updateReq, now)
if err != nil {
return nil, err
}
return reputations.ApplyUpdates(ctx, updateReq.NodeID, mutations, updateReq.Config, now)
}
// ApplyUpdates updates a node's reputation stats.
// The update is done in a loop to handle concurrent update calls and to avoid
// the need for a explicit transaction.
// There are two main steps go into the update process:
// There are three main steps go into the update process:
// 1. Get existing row for the node
// 2. Depends on the result of the first step,
// a. if existing row is returned, do compare-and-swap.
// b. if no row found, insert a new row.
func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *reputation.Info, err error) {
// a. if no row found, insert a new row.
// 2. Evaluate what the new values for the row fields should be.
// 3. Update row using compare-and-swap.
func (reputations *reputations) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates reputation.Mutations, reputationConfig reputation.Config, now time.Time) (_ *reputation.Info, err error) {
defer mon.Task()(&ctx)(&err)
for {
// get existing reputation stats
dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()))
dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, Error.Wrap(err)
}
@ -52,22 +61,26 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
// set default reputation stats for new node
newNode := dbx.Reputation{
Id: updateReq.NodeID.Bytes(),
Id: nodeID.Bytes(),
UnknownAuditReputationAlpha: 1,
AuditReputationAlpha: 1,
OnlineScore: 1,
AuditHistory: historyBytes,
}
auditHistoryResponse, err := updateAuditHistory(ctx, historyBytes, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now)
var windows []*pb.AuditWindow
if updates.OnlineHistory != nil {
windows = updates.OnlineHistory.Windows
}
auditHistoryResponse, err := mergeAuditHistory(ctx, historyBytes, windows, reputationConfig.AuditHistory)
if err != nil {
return nil, Error.Wrap(err)
}
update := reputations.populateUpdateNodeStats(&newNode, updateReq, auditHistoryResponse, now)
update := reputations.populateUpdateNodeStats(&newNode, updates, reputationConfig, auditHistoryResponse, now)
createFields := reputations.populateCreateFields(update)
stats, err := reputations.db.Create_Reputation(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), dbx.Reputation_AuditHistory(auditHistoryResponse.History), createFields)
stats, err := reputations.db.Create_Reputation(ctx, dbx.Reputation_Id(nodeID.Bytes()), dbx.Reputation_AuditHistory(auditHistoryResponse.History), createFields)
if err != nil {
// if node has been added into the table during a concurrent
// Update call happened between Get and Insert, we will try again so the audit is recorded
@ -87,16 +100,16 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
return &status, nil
}
auditHistoryResponse, err := updateAuditHistory(ctx, dbNode.AuditHistory, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now)
auditHistoryResponse, err := mergeAuditHistory(ctx, dbNode.AuditHistory, updates.OnlineHistory.Windows, reputationConfig.AuditHistory)
if err != nil {
return nil, Error.Wrap(err)
}
update := reputations.populateUpdateNodeStats(dbNode, updateReq, auditHistoryResponse, now)
update := reputations.populateUpdateNodeStats(dbNode, updates, reputationConfig, auditHistoryResponse, now)
updateFields := reputations.populateUpdateFields(update, auditHistoryResponse.History)
oldAuditHistory := dbx.Reputation_AuditHistory(dbNode.AuditHistory)
dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), oldAuditHistory, updateFields)
dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(nodeID.Bytes()), oldAuditHistory, updateFields)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, Error.Wrap(err)
}
@ -387,8 +400,8 @@ func (reputations *reputations) populateUpdateFields(update updateNodeStats, his
return updateFields
}
func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) updateNodeStats {
// there are three audit outcomes: success, failure, and unknown
func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, updates reputation.Mutations, config reputation.Config, historyResponse *reputation.UpdateAuditHistoryResponse, now time.Time) updateNodeStats {
// there are four audit outcomes: success, failure, offline, and unknown
// if a node fails enough audits, it gets disqualified
// if a node gets enough "unknown" audits, it gets put into suspension
// if a node gets enough successful audits, and is in suspension, it gets removed from suspension
@ -399,63 +412,75 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
totalAuditCount := dbNode.TotalAuditCount
vettedAt := dbNode.VettedAt
var updatedTotalAuditCount int64
logger := reputations.db.log.With(zap.Stringer("Node ID", zapNodeIDBytes(dbNode.Id)))
switch updateReq.AuditOutcome {
case reputation.AuditSuccess:
// for a successful audit, increase reputation for normal *and* unknown audits
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
true,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
// we will use updatedTotalAuditCount from the updateReputation call above
unknownAuditAlpha, unknownAuditBeta, _ = updateReputation(
true,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case reputation.AuditFailure:
// for audit failure, only update normal alpha/beta
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
false,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case reputation.AuditUnknown:
// for audit unknown, only update unknown alpha/beta
unknownAuditAlpha, unknownAuditBeta, updatedTotalAuditCount = updateReputation(
false,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case reputation.AuditOffline:
// for audit offline, only update total audit count
updatedTotalAuditCount = totalAuditCount + 1
}
// Here we rely on the observation that, conceptually, if we have
// collected some list of successes failures while auditing node N
// during some short time period, it might reasonably have happened that
// the events occurred in a different order.
//
// That is, if a node passed audit 1, then failed audit 2, then passed
// audit 3, it is fair to treat it as if it passed two audits and then
// failed one. This is because we expect that the order in which the
// events occurred is not very relevant. If a node failed an audit for
// piece P at time T, then it likely would also have failed an audit
// for the same piece at time T±ε, so we can grade it as though that
// had happened.
//
// There are conditions under which the order of events makes the
// difference in whether a node is disqualified or not. To be as fair
// as possible, we will not disqualify in those conditions. If a node
// remains un-disqualified under any ordering of events, we should not
// disqualify it. To that end, we will always apply failures _before_
// applying successes. That ordering will always yield the highest
// possible result alpha and the lowest possible result beta, assuming
// weight > 0 and 0 < λ < 1 (the proof is left as an exercise for the
// reader).
// for audit failure, only update normal alpha/beta
auditBeta, auditAlpha = reputation.UpdateReputationMultiple(
updates.FailureResults,
auditBeta,
auditAlpha,
config.AuditLambda,
config.AuditWeight,
)
// for audit unknown, only update unknown alpha/beta
unknownAuditBeta, unknownAuditAlpha = reputation.UpdateReputationMultiple(
updates.UnknownResults,
unknownAuditBeta,
unknownAuditAlpha,
config.AuditLambda,
config.AuditWeight,
)
// for a successful audit, increase reputation for normal *and* unknown audits
auditAlpha, auditBeta = reputation.UpdateReputationMultiple(
updates.PositiveResults,
auditAlpha,
auditBeta,
config.AuditLambda,
config.AuditWeight,
)
unknownAuditAlpha, unknownAuditBeta = reputation.UpdateReputationMultiple(
updates.PositiveResults,
unknownAuditAlpha,
unknownAuditBeta,
config.AuditLambda,
config.AuditWeight,
)
// offline results affect only the total count.
updatedTotalAuditCount := totalAuditCount + int64(updates.OfflineResults+updates.UnknownResults+updates.FailureResults+updates.PositiveResults)
mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //mon:locked
mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //mon:locked
mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //mon:locked
mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //mon:locked
mon.FloatVal("audit_online_score").Observe(auditHistoryResponse.NewScore) //mon:locked
isUp := updateReq.AuditOutcome != reputation.AuditOffline
mon.FloatVal("audit_online_score").Observe(historyResponse.NewScore) //mon:locked
updateFields := updateNodeStats{
NodeID: updateReq.NodeID,
NodeID: dbNode.Id,
TotalAuditCount: int64Field{set: true, value: updatedTotalAuditCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
@ -464,18 +489,18 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
// Updating node stats always exits it from containment mode
Contained: boolField{set: true, value: false},
// always update online score
OnlineScore: float64Field{set: true, value: auditHistoryResponse.NewScore},
OnlineScore: float64Field{set: true, value: historyResponse.NewScore},
}
if vettedAt == nil && updatedTotalAuditCount >= updateReq.AuditCount {
if vettedAt == nil && updatedTotalAuditCount >= config.AuditCount {
updateFields.VettedAt = timeField{set: true, value: now}
}
// disqualification case a
// a) Success/fail audit reputation falls below audit DQ threshold
auditRep := auditAlpha / (auditAlpha + auditBeta)
if auditRep <= updateReq.AuditDQ {
reputations.db.log.Info("Disqualified", zap.String("DQ type", "audit failure"), zap.String("Node ID", updateReq.NodeID.String()))
if auditRep <= config.AuditDQ {
logger.Info("Disqualified", zap.String("DQ type", "audit failure"))
mon.Meter("bad_audit_dqs").Mark(1) //mon:locked
updateFields.Disqualified = timeField{set: true, value: now}
updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonAuditFailure)}
@ -483,9 +508,9 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
// if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node.
unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta)
if unknownAuditRep <= updateReq.AuditDQ {
if unknownAuditRep <= config.AuditDQ {
if dbNode.UnknownAuditSuspended == nil {
reputations.db.log.Info("Suspended", zap.String("Node ID", updateFields.NodeID.String()), zap.String("Category", "Unknown Audits"))
logger.Info("Suspended", zap.String("Category", "Unknown Audits"))
updateFields.UnknownAuditSuspended = timeField{set: true, value: now}
}
@ -494,37 +519,28 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
// AND the suspended grace period has elapsed
// AND audit outcome is unknown or failed
// if suspended grace period has elapsed and audit outcome was failed or unknown,
// disqualify node. Set suspended to nil if node is disqualified
// NOTE: if updateFields.Suspended is set, we just suspended the node so it will not be disqualified
if updateReq.AuditOutcome != reputation.AuditSuccess {
if dbNode.UnknownAuditSuspended != nil && !updateFields.UnknownAuditSuspended.set &&
time.Since(*dbNode.UnknownAuditSuspended) > updateReq.SuspensionGracePeriod &&
updateReq.SuspensionDQEnabled {
reputations.db.log.Info("Disqualified", zap.String("DQ type", "suspension grace period expired for unknown audits"), zap.String("Node ID", updateReq.NodeID.String()))
mon.Meter("unknown_suspension_dqs").Mark(1) //mon:locked
updateFields.Disqualified = timeField{set: true, value: now}
updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonSuspension)}
updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true}
}
// if suspended grace period has elapsed and unknown audit rep is still
// too low, disqualify node. Set suspended to nil if node is disqualified
// NOTE: if updateFields.UnknownAuditSuspended is set, we just suspended
// the node a few lines above, so it will not be disqualified.
if dbNode.UnknownAuditSuspended != nil && !updateFields.UnknownAuditSuspended.set &&
time.Since(*dbNode.UnknownAuditSuspended) > config.SuspensionGracePeriod &&
config.SuspensionDQEnabled {
logger.Info("Disqualified", zap.String("DQ type", "suspension grace period expired for unknown audits"))
mon.Meter("unknown_suspension_dqs").Mark(1) //mon:locked
updateFields.Disqualified = timeField{set: true, value: now}
updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonSuspension)}
updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true}
}
} else if dbNode.UnknownAuditSuspended != nil {
reputations.db.log.Info("Suspension lifted", zap.String("Category", "Unknown Audits"), zap.String("Node ID", updateFields.NodeID.String()))
logger.Info("Suspension lifted", zap.String("Category", "Unknown Audits"))
updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true}
}
if isUp {
updateFields.LastContactSuccess = timeField{set: true, value: now}
} else {
updateFields.LastContactFailure = timeField{set: true, value: now}
}
if updateReq.AuditOutcome == reputation.AuditSuccess {
updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + 1}
}
updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + int64(updates.PositiveResults)}
// if suspension not enabled, skip penalization and unsuspend node if applicable
if !updateReq.AuditHistory.OfflineSuspensionEnabled {
if !config.AuditHistory.OfflineSuspensionEnabled {
if dbNode.OfflineSuspended != nil {
updateFields.OfflineSuspended = timeField{set: true, isNil: true}
}
@ -537,7 +553,7 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
// only penalize node if online score is below threshold and
// if it has enough completed windows to fill a tracking period
penalizeOfflineNode := false
if auditHistoryResponse.NewScore < updateReq.AuditHistory.OfflineThreshold && auditHistoryResponse.TrackingPeriodFull {
if historyResponse.NewScore < config.AuditHistory.OfflineThreshold && historyResponse.TrackingPeriodFull {
penalizeOfflineNode = true
}
@ -550,16 +566,16 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation,
updateFields.OfflineSuspended = timeField{set: true, value: now}
}
gracePeriodEnd := dbNode.UnderReview.Add(updateReq.AuditHistory.GracePeriod)
trackingPeriodEnd := gracePeriodEnd.Add(updateReq.AuditHistory.TrackingPeriod)
gracePeriodEnd := dbNode.UnderReview.Add(config.AuditHistory.GracePeriod)
trackingPeriodEnd := gracePeriodEnd.Add(config.AuditHistory.TrackingPeriod)
trackingPeriodPassed := now.After(trackingPeriodEnd)
// after tracking period has elapsed, if score is good, clear under review
// otherwise, disqualify node (if OfflineDQEnabled feature flag is true)
if trackingPeriodPassed {
if penalizeOfflineNode {
if updateReq.AuditHistory.OfflineDQEnabled {
reputations.db.log.Info("Disqualified", zap.String("DQ type", "node offline"), zap.String("Node ID", updateReq.NodeID.String()))
if config.AuditHistory.OfflineDQEnabled {
logger.Info("Disqualified", zap.String("DQ type", "node offline"))
mon.Meter("offline_dqs").Mark(1) //mon:locked
updateFields.Disqualified = timeField{set: true, value: now}
updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonNodeOffline)}
@ -605,7 +621,7 @@ type timeField struct {
}
type updateNodeStats struct {
NodeID storj.NodeID
NodeID []byte
VettedAt timeField
TotalAuditCount int64Field
AuditReputationAlpha float64Field
@ -615,8 +631,6 @@ type updateNodeStats struct {
UnknownAuditReputationAlpha float64Field
UnknownAuditReputationBeta float64Field
UnknownAuditSuspended timeField
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
OfflineUnderReview timeField
@ -651,16 +665,12 @@ func dbxToReputationInfo(dbNode *dbx.Reputation) (reputation.Info, error) {
return info, nil
}
// updateReputation uses the Beta distribution model to determine a node's reputation.
// lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score.
// w is the normalization weight that affects how severely new updates affect the current reputation distribution.
func updateReputation(isSuccess bool, alpha, beta, lambda, w float64, totalCount int64) (newAlpha, newBeta float64, updatedCount int64) {
// v is a single feedback value that allows us to update both alpha and beta
var v float64 = -1
if isSuccess {
v = 1
type zapNodeIDBytes []byte
func (z zapNodeIDBytes) String() string {
nodeID, err := storj.NodeIDFromBytes([]byte(z))
if err != nil {
return fmt.Sprintf("invalid node-id 0x%x", []byte(z))
}
newAlpha = lambda*alpha + w*(1+v)/2
newBeta = lambda*beta + w*(1-v)/2
return newAlpha, newBeta, totalCount + 1
return nodeID.String()
}