From 737d7c7dfca89daf53494b667b6f2b6fd5e8aeb9 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Sat, 7 May 2022 12:43:32 -0500 Subject: [PATCH] satellite/reputation: new ApplyUpdates() method The ApplyUpdates() method on the reputation.DB interface acts like the similar Update() method, but can allow for applying the changes from multiple audit events, instead of only one. This will be necessary for the reputation write cache, which will batch up changes to each node's reputation in order to flush them periodically. Refs: https://github.com/storj/storj/issues/4601 Change-Id: I44cc47767ea2d9423166bb8fed080c8a11182041 --- satellite/audit/disqualification_test.go | 10 +- satellite/reputation/audithistory.go | 73 +++++++ satellite/reputation/audithistory_test.go | 223 ++++++++++++++++++++ satellite/reputation/calculations.go | 69 +++++++ satellite/reputation/db_test.go | 128 ++++++++++++ satellite/reputation/service.go | 36 +++- satellite/reputation/service_test.go | 16 +- satellite/satellitedb/audithistory.go | 31 +-- satellite/satellitedb/reputations.go | 238 +++++++++++----------- 9 files changed, 676 insertions(+), 148 deletions(-) create mode 100644 satellite/reputation/calculations.go diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index 43fdb1856..b98a9827c 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -77,9 +77,9 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { require.NoError(t, err) reputation := calcReputation(reputationInfo) - require.Truef(t, prevReputation >= reputation, - "(%d) expected reputation to remain or decrease (previous >= current): %f >= %f", - iterations, prevReputation, reputation, + require.LessOrEqual(t, reputation, prevReputation, + "(%d) expected reputation to remain or decrease (current <= previous)", + iterations, ) if reputation <= auditDQCutOff || reputation == prevReputation { @@ -88,7 +88,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { iterations, auditDQCutOff, prevReputation, reputation, ) - require.True(t, time.Since(*reputationInfo.Disqualified) >= 0, + require.GreaterOrEqual(t, time.Since(*reputationInfo.Disqualified), time.Duration(0), "Disqualified should be in the past", ) @@ -99,7 +99,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { prevReputation = reputation } - require.True(t, iterations > 1, "the number of iterations must be at least 2") + require.Greater(t, iterations, 1, "the number of iterations must be at least 2") }) } diff --git a/satellite/reputation/audithistory.go b/satellite/reputation/audithistory.go index 554c08a79..2e44b36f3 100644 --- a/satellite/reputation/audithistory.go +++ b/satellite/reputation/audithistory.go @@ -94,3 +94,76 @@ func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, con a.Score = totalWindowScores / float64(len(a.Windows)-1) return nil } + +// MergeAuditHistories merges two audit histories into one, including all +// windows that are present in either input and summing counts for +// any windows that appear in _both_ inputs. Any windows that are now outside +// the tracking period will be trimmed. +// +// The history parameter will be mutated to include the windows passed as +// addHistory. +// +// Returns true if the number of windows in the new history is the maximum +// possible for the tracking config. +func MergeAuditHistories(history *pb.AuditHistory, addHistory []*pb.AuditWindow, config AuditHistoryConfig) (trackingPeriodFull bool) { + windows := history.Windows + + for addIndex, windowIndex := 0, 0; addIndex < len(addHistory); { + switch { + case windowIndex == len(windows): + windows = append(windows, &pb.AuditWindow{ + WindowStart: addHistory[addIndex].WindowStart, + }) + fallthrough + case windows[windowIndex].WindowStart.Equal(addHistory[addIndex].WindowStart): + windows[windowIndex].TotalCount += addHistory[addIndex].TotalCount + windows[windowIndex].OnlineCount += addHistory[addIndex].OnlineCount + addIndex++ + case windows[windowIndex].WindowStart.Before(addHistory[addIndex].WindowStart): + windowIndex++ + case windows[windowIndex].WindowStart.After(addHistory[addIndex].WindowStart): + windows = append(windows[:windowIndex+1], windows[windowIndex:]...) + windows[windowIndex] = &pb.AuditWindow{ + WindowStart: addHistory[addIndex].WindowStart, + TotalCount: addHistory[addIndex].TotalCount, + OnlineCount: addHistory[addIndex].OnlineCount, + } + addIndex++ + } + } + + // trim off windows that are too old + if len(windows) > 0 { + cutoffTime := windows[len(windows)-1].WindowStart.Add(-config.TrackingPeriod) + for len(windows) > 0 && windows[0].WindowStart.Before(cutoffTime) { + windows = windows[1:] + } + } + + history.Windows = windows + RecalculateScore(history) + + windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) + trackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod + + return trackingPeriodFull +} + +// RecalculateScore calculates and assigns the Score field in a pb.AuditHistory object. +// The score is calculated by averaging the online percentage in each window +// (not including the last). +func RecalculateScore(history *pb.AuditHistory) { + if len(history.Windows) <= 1 { + history.Score = 1 + return + } + totalWindowScores := float64(0) + for i, window := range history.Windows { + // do not include last window in score + if i+1 == len(history.Windows) { + break + } + totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount) + } + history.Score = totalWindowScores / float64(len(history.Windows)-1) +} diff --git a/satellite/reputation/audithistory_test.go b/satellite/reputation/audithistory_test.go index 4cf76c3c5..556d872f3 100644 --- a/satellite/reputation/audithistory_test.go +++ b/satellite/reputation/audithistory_test.go @@ -76,3 +76,226 @@ func TestAddAuditToHistory(t *testing.T) { require.NoError(t, err) require.EqualValues(t, expectedScore, history.Score) } + +func TestMergeAuditHistoriesWithSingleAudit(t *testing.T) { + config := reputation.AuditHistoryConfig{ + WindowSize: time.Hour, + TrackingPeriod: 2 * time.Hour, + GracePeriod: time.Hour, + OfflineThreshold: 0.6, + OfflineDQEnabled: true, + OfflineSuspensionEnabled: true, + } + + startingWindow := time.Now().Truncate(time.Hour) + windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) + currentWindow := startingWindow + + history := &pb.AuditHistory{} + + // online score should be 1 until the first window is finished + trackingPeriodFull := testMergeAuditHistories(history, false, currentWindow.Add(2*time.Minute), config) + require.EqualValues(t, 1, history.Score) + require.False(t, trackingPeriodFull) + + trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(20*time.Minute), config) + require.EqualValues(t, 1, history.Score) + require.False(t, trackingPeriodFull) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // online score should be now be 0.5 since the first window is complete with one online audit and one offline audit + trackingPeriodFull = testMergeAuditHistories(history, false, currentWindow.Add(2*time.Minute), config) + require.EqualValues(t, 0.5, history.Score) + require.False(t, trackingPeriodFull) + + trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(20*time.Minute), config) + require.EqualValues(t, 0.5, history.Score) + require.False(t, trackingPeriodFull) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // add another online audit for the latest window; score should still be 0.5 + trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow, config) + require.EqualValues(t, 0.5, history.Score) + // now that we have two full windows other than the current one, tracking period should be considered full. + require.True(t, trackingPeriodFull) + // add another online audit for the latest window; score should still be 0.5 + trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(45*time.Minute), config) + require.EqualValues(t, 0.5, history.Score) + require.True(t, trackingPeriodFull) + + currentWindow = currentWindow.Add(time.Hour) + // in the current state, there are windowsInTrackingPeriod windows with a score of 0.5 + // and one window with a score of 1.0. The Math below calculates the new score when the latest + // window gets included in the tracking period, and the earliest 0.5 window gets dropped. + expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod) + // add online audit for next window; score should now be expectedScore + trackingPeriodFull = testMergeAuditHistories(history, true, currentWindow.Add(time.Minute), config) + require.EqualValues(t, expectedScore, history.Score) + require.True(t, trackingPeriodFull) +} + +func testMergeAuditHistories(history *pb.AuditHistory, online bool, auditTime time.Time, config reputation.AuditHistoryConfig) bool { + onlineCount := int32(0) + if online { + onlineCount = 1 + } + windows := []*pb.AuditWindow{{ + WindowStart: auditTime.Truncate(config.WindowSize), + OnlineCount: onlineCount, + TotalCount: 1, + }} + return reputation.MergeAuditHistories(history, windows, config) +} + +type hist struct { + online bool + startAt time.Time +} + +func TestMergeAuditHistoriesWithMultipleAudits(t *testing.T) { + config := reputation.AuditHistoryConfig{ + WindowSize: 10 * time.Minute, + TrackingPeriod: 1 * time.Hour, + } + startTime := time.Now().Truncate(time.Hour).Add(-time.Hour) + + t.Parallel() + + t.Run("normal-merge", func(t *testing.T) { + history := makeHistory([]hist{ + // first window: half online + {true, startTime}, + {false, startTime.Add(1 * time.Minute)}, + {true, startTime.Add(5 * time.Minute)}, + {false, startTime.Add(8 * time.Minute)}, + // second window: all online + {true, startTime.Add(10 * time.Minute)}, + {true, startTime.Add(11 * time.Minute)}, + {true, startTime.Add(20*time.Minute - time.Second)}, + // third window: all online + {true, startTime.Add(20 * time.Minute)}, + // fourth window: all online + {true, startTime.Add(30 * time.Minute)}, + // fifth window; won't be included in score + {false, startTime.Add(40 * time.Minute)}, + }, config) + require.Equal(t, float64(0.875), history.Score) // 3.5/4; chosen to be exact in floating point + + // make the second, third, and fourth windows go from all-online to half-online + addHistory := makeHistory([]hist{ + // fits in second window + {false, startTime.Add(12 * time.Minute)}, + {false, startTime.Add(13 * time.Minute)}, + {false, startTime.Add(14 * time.Minute)}, + // fits in third window + {false, startTime.Add(20*time.Minute + time.Microsecond)}, + // fits in fourth window + {false, startTime.Add(40*time.Minute - time.Microsecond)}, + }, config) + require.Equal(t, float64(0), addHistory.Score) + + periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config) + + require.False(t, periodFull) + require.Equal(t, 5, len(history.Windows)) + require.Equal(t, float64(0.5), history.Score) // all windows at 50% online + }) + + t.Run("trim-old-windows", func(t *testing.T) { + history := makeHistory([]hist{ + // this window is too old + {true, startTime.Add(-2 * time.Minute)}, + {true, startTime.Add(-1 * time.Minute)}, + // oldest window + {false, startTime.Add(0)}, + // newest window (not included in score) + {true, startTime.Add(1 * time.Hour)}, + }, config) + require.Equal(t, float64(0.5), history.Score) // the too-old window is still included in the score here + + addHistory := makeHistory([]hist{ + // this window is too old + {true, startTime.Add(-10 * time.Minute)}, + // oldest window + {false, startTime.Add(9 * time.Minute)}, + // a window entirely not present in the other history + {true, startTime.Add(10 * time.Minute)}, + }, config) + require.Equal(t, float64(0.5), addHistory.Score) // the latest window is not included (yet) + + periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config) + + require.False(t, periodFull) + require.Equal(t, 3, len(history.Windows)) + // oldest window = 0/2, second window = 1/1, third window not counted + require.Equal(t, float64(0.5), history.Score) + }) + + t.Run("merge-with-empty", func(t *testing.T) { + history := makeHistory([]hist{}, config) + require.Equal(t, float64(1), history.Score) + + addHistory := makeHistory([]hist{ + {true, startTime.Add(0)}, + {false, startTime.Add(10 * time.Minute)}, + {false, startTime.Add(59 * time.Minute)}, + }, config) + require.Equal(t, float64(0.5), addHistory.Score) + + periodFull := reputation.MergeAuditHistories(history, addHistory.Windows, config) + + require.False(t, periodFull) + require.Equal(t, 3, len(history.Windows)) + require.Equal(t, float64(0.5), history.Score) + + // now merge with an empty addHistory instead + addHistory = makeHistory([]hist{}, config) + require.Equal(t, float64(1), addHistory.Score) + + periodFull = reputation.MergeAuditHistories(history, addHistory.Windows, config) + + require.False(t, periodFull) + require.Equal(t, 3, len(history.Windows)) + require.Equal(t, float64(0.5), history.Score) + + // and finally, merge two empty histories with each other + history = makeHistory([]hist{}, config) + addHistory = makeHistory([]hist{}, config) + + periodFull = reputation.MergeAuditHistories(history, addHistory.Windows, config) + + require.False(t, periodFull) + require.Equal(t, 0, len(history.Windows)) + require.Equal(t, float64(1), history.Score) + }) +} + +func makeHistory(histWindows []hist, config reputation.AuditHistoryConfig) *pb.AuditHistory { + windows := make([]*pb.AuditWindow, 0, len(histWindows)) + for _, histWindow := range histWindows { + onlineCount := int32(0) + if histWindow.online { + onlineCount = 1 + } + startAt := histWindow.startAt.Truncate(config.WindowSize) + if len(windows) > 0 && startAt == windows[len(windows)-1].WindowStart { + windows[len(windows)-1].OnlineCount += onlineCount + windows[len(windows)-1].TotalCount++ + } else { + windows = append(windows, &pb.AuditWindow{ + OnlineCount: onlineCount, + TotalCount: 1, + WindowStart: startAt, + }) + } + } + baseHistory := &pb.AuditHistory{ + Windows: windows, + } + reputation.RecalculateScore(baseHistory) + return baseHistory +} diff --git a/satellite/reputation/calculations.go b/satellite/reputation/calculations.go new file mode 100644 index 000000000..bb1a3c08f --- /dev/null +++ b/satellite/reputation/calculations.go @@ -0,0 +1,69 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package reputation + +import "math" + +// UpdateReputation uses the Beta distribution model to determine a node's reputation. +// lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score. +// w is the normalization weight that affects how severely new updates affect the current reputation distribution. +func UpdateReputation(isSuccess bool, alpha, beta, lambda, w float64) (newAlpha, newBeta float64) { + // v is a single feedback value that allows us to update both alpha and beta + var v float64 = -1 + if isSuccess { + v = 1 + } + newAlpha = lambda*alpha + w*(1+v)/2 + newBeta = lambda*beta + w*(1-v)/2 + return newAlpha, newBeta +} + +// UpdateReputationMultiple works like UpdateReputation, but applies multiple +// successive counts of an event type to the alpha and beta measures. +// +// With the arguments as named, applies 'count' successful audits. To apply negative +// audits, swap the alpha and beta parameters and return values. +// +// +// WARNING: GREEK LETTER MATH AHEAD +// +// Applying n successful audit results to an initial alpha value of α₀ gives a +// new α₁ value of: +// +// α₁ = λⁿα₀ + λⁿ⁻¹w + λⁿ⁻²w + ... + λ²w + λw + w +// +// The terms with w are the first n terms of a geometric series with coefficient +// w and common ratio λ. The closed form formula for the sum of those first n +// terms is (w(1-λⁿ) / (1-λ)) +// (https://en.wikipedia.org/wiki/Geometric_series#Closed-form_formula). +// Adding the initial λⁿα₀ term, we get +// +// α₁ = λⁿα₀ + w(1-λⁿ) / (1-λ) +// +// The formula has the same structure for beta for n _failures_. +// +// β₁ = λⁿβ₀ + w(1-λⁿ) / (1-λ) +// +// For n _failures_, +// +// α₁ = λⁿα₀ +// +// For n _successes_, +// +// β₁ = λⁿβ₀ +// +func UpdateReputationMultiple(count int, alpha, beta, lambda, w float64) (newAlpha, newBeta float64) { + if lambda == 1 { + // special case: when the coefficient is 1, the closed-form formula is invalid + // (gives NaN because of a division by zero). Fortunately, the replacement + // formula in this case is even simpler. + newAlpha = alpha + w*float64(count) + newBeta = beta + } else { + lambdaPowN := math.Pow(lambda, float64(count)) + newAlpha = lambdaPowN*alpha + w*(1-lambdaPowN)/(1-lambda) + newBeta = lambdaPowN * beta + } + return newAlpha, newBeta +} diff --git a/satellite/reputation/db_test.go b/satellite/reputation/db_test.go index b30f40ac6..65b5b40a5 100644 --- a/satellite/reputation/db_test.go +++ b/satellite/reputation/db_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "storj.io/common/pb" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -65,6 +66,133 @@ func TestUpdate(t *testing.T) { }) } +// TestApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call +// is equivalent to making multiple separate Update() calls (modulo some details +// like exact-time-of-disqualification). +func TestApplyUpdatesEquivalentToMultipleUpdates(t *testing.T) { + t.Parallel() + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + reputationDB := db.Reputation() + config := reputation.Config{ + AuditLambda: 0.99, + AuditWeight: 1, + AuditDQ: 0.1, + SuspensionGracePeriod: 20 * time.Minute, + SuspensionDQEnabled: true, + AuditCount: 3, + AuditHistory: reputation.AuditHistoryConfig{ + WindowSize: 10 * time.Minute, + TrackingPeriod: 1 * time.Hour, + GracePeriod: 20 * time.Minute, + OfflineThreshold: 0.5, + OfflineDQEnabled: false, + OfflineSuspensionEnabled: true, + }, + } + + for _, testDef := range []struct { + name string + failures int + successes int + offlines int + unknowns int + }{ + {"4f-3s", 4, 3, 0, 0}, + {"3s-3o", 0, 3, 3, 0}, + {"4s-2u", 0, 4, 0, 2}, + {"1f-4s-1o-3u", 1, 4, 1, 3}, + {"4o", 4, 0, 0, 0}, + {"5s", 0, 5, 0, 0}, + {"6u", 0, 0, 0, 6}, + } { + t.Run(testDef.name, func(t *testing.T) { + node1 := testrand.NodeID() + node2 := testrand.NodeID() + startTime := time.Now().Add(-time.Hour) + var ( + info1, info2 *reputation.Info + err error + ) + + // Do the Update() calls first, on node1 + + updateReq := reputation.UpdateRequest{ + NodeID: node1, + Config: config, + } + + updateReq.AuditOutcome = reputation.AuditFailure + for i := 0; i < testDef.failures; i++ { + info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(i)*time.Minute)) + require.NoError(t, err) + } + updateReq.AuditOutcome = reputation.AuditOffline + for i := 0; i < testDef.offlines; i++ { + info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(10+i)*time.Minute)) + require.NoError(t, err) + } + updateReq.AuditOutcome = reputation.AuditUnknown + for i := 0; i < testDef.unknowns; i++ { + info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(20+i)*time.Minute)) + require.NoError(t, err) + } + updateReq.AuditOutcome = reputation.AuditSuccess + for i := 0; i < testDef.successes; i++ { + info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(30+i)*time.Minute)) + require.NoError(t, err) + } + + // Now do the single ApplyUpdates call, on node2 + + var hist pb.AuditHistory + for i := 0; i < testDef.failures; i++ { + err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(i)*time.Minute), config.AuditHistory) + require.NoError(t, err) + } + for i := 0; i < testDef.offlines; i++ { + err = reputation.AddAuditToHistory(&hist, false, startTime.Add(time.Duration(10+i)*time.Minute), config.AuditHistory) + require.NoError(t, err) + } + for i := 0; i < testDef.unknowns; i++ { + err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(20+i)*time.Minute), config.AuditHistory) + require.NoError(t, err) + } + for i := 0; i < testDef.successes; i++ { + err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(30+i)*time.Minute), config.AuditHistory) + require.NoError(t, err) + } + mutations := reputation.Mutations{ + PositiveResults: testDef.successes, + FailureResults: testDef.failures, + UnknownResults: testDef.unknowns, + OfflineResults: testDef.offlines, + OnlineHistory: &hist, + } + info2, err = reputationDB.ApplyUpdates(ctx, node2, mutations, config, startTime.Add(40*time.Minute)) + require.NoError(t, err) + + require.NotNil(t, info1) + require.NotNil(t, info2) + require.Equalf(t, info1.VettedAt == nil, info2.VettedAt == nil, + "info1.VettedAt (%v) and info2.VettedAt (%v) should both be nil or both have values", info1.VettedAt, info2.VettedAt) + require.Equalf(t, info1.Disqualified == nil, info2.Disqualified == nil, + "info1.Disqualified (%v) and info2.Disqualified (%v) should both be nil or both have values", info1.Disqualified, info2.Disqualified) + require.InDelta(t, info1.AuditReputationAlpha, info2.AuditReputationAlpha, 1e-8) + require.InDelta(t, info1.AuditReputationBeta, info2.AuditReputationBeta, 1e-8) + require.InDelta(t, info1.UnknownAuditReputationAlpha, info2.UnknownAuditReputationAlpha, 1e-8) + require.InDelta(t, info1.UnknownAuditReputationBeta, info2.UnknownAuditReputationBeta, 1e-8) + require.InDelta(t, info1.OnlineScore, info2.OnlineScore, 1e-8) + require.NotNil(t, info1.AuditHistory) + require.NotNil(t, info2.AuditHistory) + require.Equal(t, info1.AuditHistory.Score, info2.AuditHistory.Score) + require.Equal(t, len(info1.AuditHistory.Windows), len(info2.AuditHistory.Windows), + "info1.AuditHistory.Windows (%v) and info2.AuditHistory.Windows (%v) should have the same length", info1.AuditHistory.Windows, info2.AuditHistory.Windows) + }) + } + }) +} + func TestDBDisqualifyNode(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { reputationDB := db.Reputation() diff --git a/satellite/reputation/service.go b/satellite/reputation/service.go index c0d952c54..ce25843d3 100644 --- a/satellite/reputation/service.go +++ b/satellite/reputation/service.go @@ -18,6 +18,9 @@ import ( type DB interface { Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error) Get(ctx context.Context, nodeID storj.NodeID) (*Info, error) + // ApplyUpdates applies multiple updates (defined by the updates + // parameter) to a node's reputations record. + ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, reputationConfig Config, now time.Time) (_ *Info, err error) // UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits. UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) @@ -45,6 +48,18 @@ type Info struct { UnknownAuditReputationBeta float64 } +// Mutations represents changes which should be made to a particular node's +// reputation, in terms of counts and/or timestamps of events which have +// occurred. A Mutations record can be applied to a reputations row without +// prior knowledge of that row's contents. +type Mutations struct { + PositiveResults int + FailureResults int + UnknownResults int + OfflineResults int + OnlineHistory *pb.AuditHistory +} + // Service handles storing node reputation data and updating // the overlay cache when a node's status changes. type Service struct { @@ -81,7 +96,7 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, rep // only update node if its health status has changed, or it's a newly vetted // node. // this prevents the need to require caller of ApplyAudit() to always know - // the VettedAt time for a node. + // the previous VettedAt time for a node. // Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison // for the VettedAt status is done using time values that are truncated to second precision. if hasReputationChanged(*statusUpdate, reputation, now) { @@ -187,3 +202,22 @@ func statusChanged(s1, s2 *time.Time) bool { } return true } + +// UpdateRequestToMutations transforms an UpdateRequest into the equivalent +// Mutations structure, which can be used with ApplyUpdates. +func UpdateRequestToMutations(updateReq UpdateRequest, now time.Time) (Mutations, error) { + updates := Mutations{} + switch updateReq.AuditOutcome { + case AuditSuccess: + updates.PositiveResults = 1 + case AuditFailure: + updates.FailureResults = 1 + case AuditUnknown: + updates.UnknownResults = 1 + case AuditOffline: + updates.OfflineResults = 1 + } + updates.OnlineHistory = &pb.AuditHistory{} + err := AddAuditToHistory(updates.OnlineHistory, updateReq.AuditOutcome != AuditOffline, now, updateReq.Config.AuditHistory) + return updates, err +} diff --git a/satellite/reputation/service_test.go b/satellite/reputation/service_test.go index 36444b0f2..0c85f802e 100644 --- a/satellite/reputation/service_test.go +++ b/satellite/reputation/service_test.go @@ -99,8 +99,8 @@ func TestApplyAudit(t *testing.T) { expectedAuditAlpha := config.AuditLambda*auditAlpha + config.AuditWeight expectedAuditBeta := config.AuditLambda * auditBeta - require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha) - require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta) + require.InDelta(t, stats.AuditReputationAlpha, expectedAuditAlpha, 1e-8) + require.InDelta(t, stats.AuditReputationBeta, expectedAuditBeta, 1e-8) auditAlpha = expectedAuditAlpha auditBeta = expectedAuditBeta @@ -113,8 +113,8 @@ func TestApplyAudit(t *testing.T) { expectedAuditAlpha = config.AuditLambda * auditAlpha expectedAuditBeta = config.AuditLambda*auditBeta + config.AuditWeight - require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha) - require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta) + require.InDelta(t, stats.AuditReputationAlpha, expectedAuditAlpha, 1e-8) + require.InDelta(t, stats.AuditReputationBeta, expectedAuditBeta, 1e-8) }) } @@ -131,8 +131,8 @@ func TestGet(t *testing.T) { node, err := service.Get(ctx, nodeID) require.NoError(t, err) require.Zero(t, node.TotalAuditCount) - require.EqualValues(t, 1, node.AuditReputationAlpha) - require.EqualValues(t, 1, node.UnknownAuditReputationAlpha) + require.InDelta(t, 1, node.AuditReputationAlpha, 1e-8) + require.InDelta(t, 1, node.UnknownAuditReputationAlpha, 1e-8) require.EqualValues(t, 1, node.OnlineScore) // if a node has no entry in reputation store, it should have default @@ -140,8 +140,8 @@ func TestGet(t *testing.T) { newNode, err := service.Get(ctx, testrand.NodeID()) require.NoError(t, err) require.Zero(t, newNode.TotalAuditCount) - require.EqualValues(t, 1, newNode.AuditReputationAlpha) - require.EqualValues(t, 1, newNode.UnknownAuditReputationAlpha) + require.InDelta(t, 1, newNode.AuditReputationAlpha, 1e-8) + require.InDelta(t, 1, newNode.UnknownAuditReputationAlpha, 1e-8) require.EqualValues(t, 1, newNode.OnlineScore) }) } diff --git a/satellite/satellitedb/audithistory.go b/satellite/satellitedb/audithistory.go index a05ad7712..0cae62f01 100644 --- a/satellite/satellitedb/audithistory.go +++ b/satellite/satellitedb/audithistory.go @@ -5,39 +5,30 @@ package satellitedb import ( "context" - "time" "storj.io/common/pb" "storj.io/storj/satellite/reputation" ) -func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputation.AuditHistoryConfig, online bool, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) { +func mergeAuditHistory(ctx context.Context, oldHistory []byte, addHistory []*pb.AuditWindow, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) { defer mon.Task()(&ctx)(&err) - res = &reputation.UpdateAuditHistoryResponse{ - NewScore: 1, - TrackingPeriodFull: false, - } - - // deserialize node audit history history := &pb.AuditHistory{} err = pb.Unmarshal(oldHistory, history) if err != nil { - return res, err + return nil, Error.Wrap(err) } - err = reputation.AddAuditToHistory(history, online, auditTime, config) + trackingPeriodFull := reputation.MergeAuditHistories(history, addHistory, config) + + historyBytes, err := pb.Marshal(history) if err != nil { - return res, err + return nil, Error.Wrap(err) } - res.History, err = pb.Marshal(history) - if err != nil { - return res, err - } - - windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) - res.TrackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod - res.NewScore = history.Score - return res, nil + return &reputation.UpdateAuditHistoryResponse{ + NewScore: history.Score, + TrackingPeriodFull: trackingPeriodFull, + History: historyBytes, + }, nil } diff --git a/satellite/satellitedb/reputations.go b/satellite/satellitedb/reputations.go index f7a7429b3..8ec10968f 100644 --- a/satellite/satellitedb/reputations.go +++ b/satellite/satellitedb/reputations.go @@ -7,6 +7,7 @@ import ( "context" "database/sql" "errors" + "fmt" "time" "github.com/zeebo/errs" @@ -25,20 +26,28 @@ type reputations struct { db *satelliteDB } -// Update updates a node's reputation stats. +func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *reputation.Info, err error) { + mutations, err := reputation.UpdateRequestToMutations(updateReq, now) + if err != nil { + return nil, err + } + return reputations.ApplyUpdates(ctx, updateReq.NodeID, mutations, updateReq.Config, now) +} + +// ApplyUpdates updates a node's reputation stats. // The update is done in a loop to handle concurrent update calls and to avoid // the need for a explicit transaction. -// There are two main steps go into the update process: +// There are three main steps go into the update process: // 1. Get existing row for the node -// 2. Depends on the result of the first step, -// a. if existing row is returned, do compare-and-swap. -// b. if no row found, insert a new row. -func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *reputation.Info, err error) { +// a. if no row found, insert a new row. +// 2. Evaluate what the new values for the row fields should be. +// 3. Update row using compare-and-swap. +func (reputations *reputations) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates reputation.Mutations, reputationConfig reputation.Config, now time.Time) (_ *reputation.Info, err error) { defer mon.Task()(&ctx)(&err) for { // get existing reputation stats - dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes())) + dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes())) if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, Error.Wrap(err) } @@ -52,22 +61,26 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation // set default reputation stats for new node newNode := dbx.Reputation{ - Id: updateReq.NodeID.Bytes(), + Id: nodeID.Bytes(), UnknownAuditReputationAlpha: 1, AuditReputationAlpha: 1, OnlineScore: 1, AuditHistory: historyBytes, } - auditHistoryResponse, err := updateAuditHistory(ctx, historyBytes, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) + var windows []*pb.AuditWindow + if updates.OnlineHistory != nil { + windows = updates.OnlineHistory.Windows + } + auditHistoryResponse, err := mergeAuditHistory(ctx, historyBytes, windows, reputationConfig.AuditHistory) if err != nil { return nil, Error.Wrap(err) } - update := reputations.populateUpdateNodeStats(&newNode, updateReq, auditHistoryResponse, now) + update := reputations.populateUpdateNodeStats(&newNode, updates, reputationConfig, auditHistoryResponse, now) createFields := reputations.populateCreateFields(update) - stats, err := reputations.db.Create_Reputation(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), dbx.Reputation_AuditHistory(auditHistoryResponse.History), createFields) + stats, err := reputations.db.Create_Reputation(ctx, dbx.Reputation_Id(nodeID.Bytes()), dbx.Reputation_AuditHistory(auditHistoryResponse.History), createFields) if err != nil { // if node has been added into the table during a concurrent // Update call happened between Get and Insert, we will try again so the audit is recorded @@ -87,16 +100,16 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation return &status, nil } - auditHistoryResponse, err := updateAuditHistory(ctx, dbNode.AuditHistory, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) + auditHistoryResponse, err := mergeAuditHistory(ctx, dbNode.AuditHistory, updates.OnlineHistory.Windows, reputationConfig.AuditHistory) if err != nil { return nil, Error.Wrap(err) } - update := reputations.populateUpdateNodeStats(dbNode, updateReq, auditHistoryResponse, now) + update := reputations.populateUpdateNodeStats(dbNode, updates, reputationConfig, auditHistoryResponse, now) updateFields := reputations.populateUpdateFields(update, auditHistoryResponse.History) oldAuditHistory := dbx.Reputation_AuditHistory(dbNode.AuditHistory) - dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), oldAuditHistory, updateFields) + dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(nodeID.Bytes()), oldAuditHistory, updateFields) if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, Error.Wrap(err) } @@ -387,8 +400,8 @@ func (reputations *reputations) populateUpdateFields(update updateNodeStats, his return updateFields } -func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) updateNodeStats { - // there are three audit outcomes: success, failure, and unknown +func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, updates reputation.Mutations, config reputation.Config, historyResponse *reputation.UpdateAuditHistoryResponse, now time.Time) updateNodeStats { + // there are four audit outcomes: success, failure, offline, and unknown // if a node fails enough audits, it gets disqualified // if a node gets enough "unknown" audits, it gets put into suspension // if a node gets enough successful audits, and is in suspension, it gets removed from suspension @@ -399,63 +412,75 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, totalAuditCount := dbNode.TotalAuditCount vettedAt := dbNode.VettedAt - var updatedTotalAuditCount int64 + logger := reputations.db.log.With(zap.Stringer("Node ID", zapNodeIDBytes(dbNode.Id))) - switch updateReq.AuditOutcome { - case reputation.AuditSuccess: - // for a successful audit, increase reputation for normal *and* unknown audits - auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation( - true, - auditAlpha, - auditBeta, - updateReq.AuditLambda, - updateReq.AuditWeight, - totalAuditCount, - ) - // we will use updatedTotalAuditCount from the updateReputation call above - unknownAuditAlpha, unknownAuditBeta, _ = updateReputation( - true, - unknownAuditAlpha, - unknownAuditBeta, - updateReq.AuditLambda, - updateReq.AuditWeight, - totalAuditCount, - ) - case reputation.AuditFailure: - // for audit failure, only update normal alpha/beta - auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation( - false, - auditAlpha, - auditBeta, - updateReq.AuditLambda, - updateReq.AuditWeight, - totalAuditCount, - ) - case reputation.AuditUnknown: - // for audit unknown, only update unknown alpha/beta - unknownAuditAlpha, unknownAuditBeta, updatedTotalAuditCount = updateReputation( - false, - unknownAuditAlpha, - unknownAuditBeta, - updateReq.AuditLambda, - updateReq.AuditWeight, - totalAuditCount, - ) - case reputation.AuditOffline: - // for audit offline, only update total audit count - updatedTotalAuditCount = totalAuditCount + 1 - } + // Here we rely on the observation that, conceptually, if we have + // collected some list of successes failures while auditing node N + // during some short time period, it might reasonably have happened that + // the events occurred in a different order. + // + // That is, if a node passed audit 1, then failed audit 2, then passed + // audit 3, it is fair to treat it as if it passed two audits and then + // failed one. This is because we expect that the order in which the + // events occurred is not very relevant. If a node failed an audit for + // piece P at time T, then it likely would also have failed an audit + // for the same piece at time T±ε, so we can grade it as though that + // had happened. + // + // There are conditions under which the order of events makes the + // difference in whether a node is disqualified or not. To be as fair + // as possible, we will not disqualify in those conditions. If a node + // remains un-disqualified under any ordering of events, we should not + // disqualify it. To that end, we will always apply failures _before_ + // applying successes. That ordering will always yield the highest + // possible result alpha and the lowest possible result beta, assuming + // weight > 0 and 0 < λ < 1 (the proof is left as an exercise for the + // reader). + + // for audit failure, only update normal alpha/beta + auditBeta, auditAlpha = reputation.UpdateReputationMultiple( + updates.FailureResults, + auditBeta, + auditAlpha, + config.AuditLambda, + config.AuditWeight, + ) + // for audit unknown, only update unknown alpha/beta + unknownAuditBeta, unknownAuditAlpha = reputation.UpdateReputationMultiple( + updates.UnknownResults, + unknownAuditBeta, + unknownAuditAlpha, + config.AuditLambda, + config.AuditWeight, + ) + + // for a successful audit, increase reputation for normal *and* unknown audits + auditAlpha, auditBeta = reputation.UpdateReputationMultiple( + updates.PositiveResults, + auditAlpha, + auditBeta, + config.AuditLambda, + config.AuditWeight, + ) + unknownAuditAlpha, unknownAuditBeta = reputation.UpdateReputationMultiple( + updates.PositiveResults, + unknownAuditAlpha, + unknownAuditBeta, + config.AuditLambda, + config.AuditWeight, + ) + + // offline results affect only the total count. + updatedTotalAuditCount := totalAuditCount + int64(updates.OfflineResults+updates.UnknownResults+updates.FailureResults+updates.PositiveResults) mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //mon:locked mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //mon:locked mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //mon:locked mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //mon:locked - mon.FloatVal("audit_online_score").Observe(auditHistoryResponse.NewScore) //mon:locked - - isUp := updateReq.AuditOutcome != reputation.AuditOffline + mon.FloatVal("audit_online_score").Observe(historyResponse.NewScore) //mon:locked updateFields := updateNodeStats{ - NodeID: updateReq.NodeID, + NodeID: dbNode.Id, TotalAuditCount: int64Field{set: true, value: updatedTotalAuditCount}, AuditReputationAlpha: float64Field{set: true, value: auditAlpha}, AuditReputationBeta: float64Field{set: true, value: auditBeta}, @@ -464,18 +489,18 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, // Updating node stats always exits it from containment mode Contained: boolField{set: true, value: false}, // always update online score - OnlineScore: float64Field{set: true, value: auditHistoryResponse.NewScore}, + OnlineScore: float64Field{set: true, value: historyResponse.NewScore}, } - if vettedAt == nil && updatedTotalAuditCount >= updateReq.AuditCount { + if vettedAt == nil && updatedTotalAuditCount >= config.AuditCount { updateFields.VettedAt = timeField{set: true, value: now} } // disqualification case a // a) Success/fail audit reputation falls below audit DQ threshold auditRep := auditAlpha / (auditAlpha + auditBeta) - if auditRep <= updateReq.AuditDQ { - reputations.db.log.Info("Disqualified", zap.String("DQ type", "audit failure"), zap.String("Node ID", updateReq.NodeID.String())) + if auditRep <= config.AuditDQ { + logger.Info("Disqualified", zap.String("DQ type", "audit failure")) mon.Meter("bad_audit_dqs").Mark(1) //mon:locked updateFields.Disqualified = timeField{set: true, value: now} updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonAuditFailure)} @@ -483,9 +508,9 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, // if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node. unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta) - if unknownAuditRep <= updateReq.AuditDQ { + if unknownAuditRep <= config.AuditDQ { if dbNode.UnknownAuditSuspended == nil { - reputations.db.log.Info("Suspended", zap.String("Node ID", updateFields.NodeID.String()), zap.String("Category", "Unknown Audits")) + logger.Info("Suspended", zap.String("Category", "Unknown Audits")) updateFields.UnknownAuditSuspended = timeField{set: true, value: now} } @@ -494,37 +519,28 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, // AND the suspended grace period has elapsed // AND audit outcome is unknown or failed - // if suspended grace period has elapsed and audit outcome was failed or unknown, - // disqualify node. Set suspended to nil if node is disqualified - // NOTE: if updateFields.Suspended is set, we just suspended the node so it will not be disqualified - if updateReq.AuditOutcome != reputation.AuditSuccess { - if dbNode.UnknownAuditSuspended != nil && !updateFields.UnknownAuditSuspended.set && - time.Since(*dbNode.UnknownAuditSuspended) > updateReq.SuspensionGracePeriod && - updateReq.SuspensionDQEnabled { - reputations.db.log.Info("Disqualified", zap.String("DQ type", "suspension grace period expired for unknown audits"), zap.String("Node ID", updateReq.NodeID.String())) - mon.Meter("unknown_suspension_dqs").Mark(1) //mon:locked - updateFields.Disqualified = timeField{set: true, value: now} - updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonSuspension)} - updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true} - } + // if suspended grace period has elapsed and unknown audit rep is still + // too low, disqualify node. Set suspended to nil if node is disqualified + // NOTE: if updateFields.UnknownAuditSuspended is set, we just suspended + // the node a few lines above, so it will not be disqualified. + if dbNode.UnknownAuditSuspended != nil && !updateFields.UnknownAuditSuspended.set && + time.Since(*dbNode.UnknownAuditSuspended) > config.SuspensionGracePeriod && + config.SuspensionDQEnabled { + logger.Info("Disqualified", zap.String("DQ type", "suspension grace period expired for unknown audits")) + mon.Meter("unknown_suspension_dqs").Mark(1) //mon:locked + updateFields.Disqualified = timeField{set: true, value: now} + updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonSuspension)} + updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true} } } else if dbNode.UnknownAuditSuspended != nil { - reputations.db.log.Info("Suspension lifted", zap.String("Category", "Unknown Audits"), zap.String("Node ID", updateFields.NodeID.String())) + logger.Info("Suspension lifted", zap.String("Category", "Unknown Audits")) updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true} } - if isUp { - updateFields.LastContactSuccess = timeField{set: true, value: now} - } else { - updateFields.LastContactFailure = timeField{set: true, value: now} - } - - if updateReq.AuditOutcome == reputation.AuditSuccess { - updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + 1} - } + updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + int64(updates.PositiveResults)} // if suspension not enabled, skip penalization and unsuspend node if applicable - if !updateReq.AuditHistory.OfflineSuspensionEnabled { + if !config.AuditHistory.OfflineSuspensionEnabled { if dbNode.OfflineSuspended != nil { updateFields.OfflineSuspended = timeField{set: true, isNil: true} } @@ -537,7 +553,7 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, // only penalize node if online score is below threshold and // if it has enough completed windows to fill a tracking period penalizeOfflineNode := false - if auditHistoryResponse.NewScore < updateReq.AuditHistory.OfflineThreshold && auditHistoryResponse.TrackingPeriodFull { + if historyResponse.NewScore < config.AuditHistory.OfflineThreshold && historyResponse.TrackingPeriodFull { penalizeOfflineNode = true } @@ -550,16 +566,16 @@ func (reputations *reputations) populateUpdateNodeStats(dbNode *dbx.Reputation, updateFields.OfflineSuspended = timeField{set: true, value: now} } - gracePeriodEnd := dbNode.UnderReview.Add(updateReq.AuditHistory.GracePeriod) - trackingPeriodEnd := gracePeriodEnd.Add(updateReq.AuditHistory.TrackingPeriod) + gracePeriodEnd := dbNode.UnderReview.Add(config.AuditHistory.GracePeriod) + trackingPeriodEnd := gracePeriodEnd.Add(config.AuditHistory.TrackingPeriod) trackingPeriodPassed := now.After(trackingPeriodEnd) // after tracking period has elapsed, if score is good, clear under review // otherwise, disqualify node (if OfflineDQEnabled feature flag is true) if trackingPeriodPassed { if penalizeOfflineNode { - if updateReq.AuditHistory.OfflineDQEnabled { - reputations.db.log.Info("Disqualified", zap.String("DQ type", "node offline"), zap.String("Node ID", updateReq.NodeID.String())) + if config.AuditHistory.OfflineDQEnabled { + logger.Info("Disqualified", zap.String("DQ type", "node offline")) mon.Meter("offline_dqs").Mark(1) //mon:locked updateFields.Disqualified = timeField{set: true, value: now} updateFields.DisqualificationReason = intField{set: true, value: int(overlay.DisqualificationReasonNodeOffline)} @@ -605,7 +621,7 @@ type timeField struct { } type updateNodeStats struct { - NodeID storj.NodeID + NodeID []byte VettedAt timeField TotalAuditCount int64Field AuditReputationAlpha float64Field @@ -615,8 +631,6 @@ type updateNodeStats struct { UnknownAuditReputationAlpha float64Field UnknownAuditReputationBeta float64Field UnknownAuditSuspended timeField - LastContactSuccess timeField - LastContactFailure timeField AuditSuccessCount int64Field Contained boolField OfflineUnderReview timeField @@ -651,16 +665,12 @@ func dbxToReputationInfo(dbNode *dbx.Reputation) (reputation.Info, error) { return info, nil } -// updateReputation uses the Beta distribution model to determine a node's reputation. -// lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score. -// w is the normalization weight that affects how severely new updates affect the current reputation distribution. -func updateReputation(isSuccess bool, alpha, beta, lambda, w float64, totalCount int64) (newAlpha, newBeta float64, updatedCount int64) { - // v is a single feedback value that allows us to update both alpha and beta - var v float64 = -1 - if isSuccess { - v = 1 +type zapNodeIDBytes []byte + +func (z zapNodeIDBytes) String() string { + nodeID, err := storj.NodeIDFromBytes([]byte(z)) + if err != nil { + return fmt.Sprintf("invalid node-id 0x%x", []byte(z)) } - newAlpha = lambda*alpha + w*(1+v)/2 - newBeta = lambda*beta + w*(1-v)/2 - return newAlpha, newBeta, totalCount + 1 + return nodeID.String() }