0dcc0a9ee0
This is in response to community feedback that our existing reputation calculation is too likely to disqualify storage nodes unfairly with extreme swings up and down. For details and analysis, please see the data_loss_vs_dq_chance_sim.py tool, the "tuning reputation further.ipynb" Jupyter notebook in the storj/datascience repository, and the discussion at https://forum.storj.io/t/tuning-audit-scoring/14084 In brief: changing the lambda and initial-alpha parameters in this way causes the swings in reputation to be smaller and less likely to put a node past the disqualification threshold unfairly. Note: this change will cause a one-time reset of all (non-disqualified) node reputations, because the new initial alpha value of 1000 is dramatically different, and the disqualification threshold is going to be much higher. Change-Id: Id6dc4ba8fde1be3db4255b72282207bab5491ca3
299 lines
9.7 KiB
Go
299 lines
9.7 KiB
Go
// Copyright (C) 2022 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package reputation_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/reputation"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestHeavyLockContention(t *testing.T) {
|
|
const (
|
|
nodeCount = 10
|
|
rounds = 200
|
|
windowSize = 10 * time.Minute
|
|
)
|
|
|
|
// construct random test data
|
|
timeStart := time.Now().Add(-rounds * windowSize)
|
|
nodes := make([]storj.NodeID, nodeCount)
|
|
testData := make([][]reputation.Mutations, nodeCount)
|
|
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
|
|
nodes[nodeNum] = testrand.NodeID()
|
|
testData[nodeNum] = make([]reputation.Mutations, rounds)
|
|
for roundNum := 0; roundNum < rounds; roundNum++ {
|
|
mutations := &testData[nodeNum][roundNum]
|
|
mutations.FailureResults = testrand.Intn(10)
|
|
mutations.OfflineResults = testrand.Intn(10)
|
|
mutations.UnknownResults = testrand.Intn(10)
|
|
mutations.PositiveResults = testrand.Intn(10)
|
|
mutations.OnlineHistory = &pb.AuditHistory{
|
|
Windows: []*pb.AuditWindow{
|
|
{
|
|
WindowStart: timeStart.Add(windowSize * time.Duration(roundNum)),
|
|
OnlineCount: int32(mutations.FailureResults + mutations.UnknownResults + mutations.PositiveResults),
|
|
TotalCount: int32(mutations.OfflineResults),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
config := reputation.Config{
|
|
AuditLambda: 1,
|
|
AuditWeight: 1,
|
|
InitialAlpha: 1000,
|
|
InitialBeta: 0,
|
|
UnknownAuditDQ: 0.6,
|
|
UnknownAuditLambda: 0.95,
|
|
AuditHistory: reputation.AuditHistoryConfig{
|
|
WindowSize: windowSize,
|
|
TrackingPeriod: 1 * time.Hour,
|
|
},
|
|
FlushInterval: 2 * time.Hour,
|
|
ErrorRetryInterval: 0,
|
|
}
|
|
reputationDB := db.Reputation()
|
|
writecacheDB := reputation.NewCachingDB(zaptest.NewLogger(t), reputationDB, config)
|
|
var group errgroup.Group
|
|
|
|
// Make room for results ahead of time, so we don't need to use any
|
|
// extra locks or atomics while the goroutines are goroutine-ing. Extra
|
|
// synchronization primitives might synchronize things that otherwise
|
|
// wouldn't have been synchronized.
|
|
resultInfos := make([][]*reputation.Info, nodeCount)
|
|
|
|
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
|
|
nodeNum := nodeNum
|
|
group.Go(func() error {
|
|
resultInfos[nodeNum] = make([]*reputation.Info, rounds)
|
|
for roundNum := 0; roundNum < rounds; roundNum++ {
|
|
now := testData[nodeNum][roundNum].OnlineHistory.Windows[0].WindowStart
|
|
now = now.Add(time.Duration(testrand.Int63n(int64(windowSize))))
|
|
info, err := writecacheDB.ApplyUpdates(ctx, nodes[nodeNum], testData[nodeNum][roundNum], config, now)
|
|
if err != nil {
|
|
return fmt.Errorf("node[%d] in round[%d]: %w", nodeNum, roundNum, err)
|
|
}
|
|
resultInfos[nodeNum][roundNum] = info.Copy()
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
err := group.Wait()
|
|
require.NoError(t, err)
|
|
|
|
// verify each step along the way had the expected running totals
|
|
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
|
|
totalAudits := 0
|
|
totalSuccess := 0
|
|
for roundNum := 0; roundNum < rounds; roundNum++ {
|
|
input := testData[nodeNum][roundNum]
|
|
output := resultInfos[nodeNum][roundNum]
|
|
|
|
totalAudits += input.UnknownResults
|
|
totalAudits += input.OfflineResults
|
|
totalAudits += input.FailureResults
|
|
totalAudits += input.PositiveResults
|
|
totalSuccess += input.PositiveResults
|
|
|
|
require.NotNil(t, output)
|
|
require.Equalf(t, int64(totalAudits), output.TotalAuditCount,
|
|
"node[%d] in round[%d]: expected %d, but TotalAuditCount=%d", nodeNum, roundNum, totalAudits, output.TotalAuditCount)
|
|
require.Equal(t, int64(totalSuccess), output.AuditSuccessCount)
|
|
expectLen := roundNum + 1
|
|
if windowSize*time.Duration(roundNum) > config.AuditHistory.TrackingPeriod {
|
|
expectLen = int(config.AuditHistory.TrackingPeriod/windowSize) + 1
|
|
}
|
|
require.Lenf(t, output.AuditHistory.Windows, expectLen,
|
|
"node[%d] in round[%d]", nodeNum, roundNum)
|
|
require.Equal(t, input.OnlineHistory.Windows[0].OnlineCount, output.AuditHistory.Windows[expectLen-1].OnlineCount)
|
|
require.Equal(t, input.OnlineHistory.Windows[0].TotalCount, output.AuditHistory.Windows[expectLen-1].TotalCount)
|
|
require.Equal(t, input.OnlineHistory.Windows[0].WindowStart.UTC().Round(0), output.AuditHistory.Windows[expectLen-1].WindowStart.UTC().Round(0))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestFetchingInfoWhileEntryIsSyncing(t *testing.T) {
|
|
const (
|
|
windowSize = 10 * time.Minute
|
|
numRounds = 20
|
|
)
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
config := reputation.Config{
|
|
AuditLambda: 1,
|
|
AuditWeight: 1,
|
|
InitialAlpha: 1000,
|
|
InitialBeta: 0,
|
|
UnknownAuditDQ: 0.6,
|
|
UnknownAuditLambda: 0.95,
|
|
AuditHistory: reputation.AuditHistoryConfig{
|
|
WindowSize: windowSize,
|
|
TrackingPeriod: 1 * time.Hour,
|
|
},
|
|
FlushInterval: 2 * time.Hour,
|
|
ErrorRetryInterval: 0,
|
|
}
|
|
logger := zaptest.NewLogger(t)
|
|
reputationDB := db.Reputation()
|
|
writecache := reputation.NewCachingDB(logger.Named("writecache"), reputationDB, config)
|
|
const positiveAudits = 123
|
|
|
|
for i := 0; i < numRounds; i++ {
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
startTime := time.Now()
|
|
managerCtx, managerCancel := context.WithCancel(ctx)
|
|
|
|
nodeID := testrand.NodeID()
|
|
// get node into the cache
|
|
_, err := writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{}, config, startTime)
|
|
require.NoError(t, err)
|
|
|
|
// The best we can do without an attached debugger framework is
|
|
// to try and time these calls at about the same time, and hope
|
|
// that everything doesn't all happen in the same order in every
|
|
// round. We'd like to test things happening in a few different
|
|
// orders.
|
|
|
|
// In this goroutine "A", we apply two different sets of updates,
|
|
// one after the other. Then we request a sync to the database.
|
|
group.Go(func() error {
|
|
// when A is done, let manager 1 know it can quit too
|
|
defer managerCancel()
|
|
|
|
info, err := writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{
|
|
OnlineHistory: &pb.AuditHistory{
|
|
Windows: []*pb.AuditWindow{
|
|
{
|
|
WindowStart: startTime.Round(windowSize),
|
|
OnlineCount: 0,
|
|
TotalCount: 10,
|
|
},
|
|
},
|
|
Score: 0,
|
|
},
|
|
}, config, startTime.Round(windowSize).Add(time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// the mutation should be visible now
|
|
if len(info.AuditHistory.Windows) != 1 {
|
|
return fmt.Errorf("assertion error: windows slice is %v (expected len 1)", info.AuditHistory.Windows)
|
|
}
|
|
|
|
// wait (very) briefly and make a second change. other satellite worker
|
|
// processes (such as goroutine "C") should not ever see the above change
|
|
// without this one (otherwise, that means we're writing everything to
|
|
// the database instead of caching writes).
|
|
time.Sleep(10 * time.Millisecond)
|
|
info, err = writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{
|
|
PositiveResults: positiveAudits,
|
|
}, config, startTime.Round(windowSize).Add(2*time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.TotalAuditCount != positiveAudits {
|
|
return fmt.Errorf("assertion error: TotalAuditCount is %d (expected %d)", info.TotalAuditCount, positiveAudits)
|
|
}
|
|
|
|
// now trigger a flush
|
|
err = writecache.RequestSync(ctx, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// In this goroutine "B", we repeatedly request the info for the
|
|
// node from the cache, until we see that the changes from "A"
|
|
// have been made. We count the number of results we get in the
|
|
// interim state (the audit history has been extended, but the
|
|
// positive audits have not been recorded) just to get an idea
|
|
// of how effective this test is.
|
|
var (
|
|
queriesFromB int
|
|
queriesFromBSeeingDirty int
|
|
)
|
|
group.Go(func() error {
|
|
for {
|
|
queriesFromB++
|
|
info, err := writecache.Get(ctx, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
firstChangeSeen := (len(info.AuditHistory.Windows) > 0)
|
|
secondChangeSeen := (info.TotalAuditCount == positiveAudits)
|
|
if firstChangeSeen != secondChangeSeen {
|
|
queriesFromBSeeingDirty++
|
|
}
|
|
if secondChangeSeen == true {
|
|
// test is done.
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
|
|
// And goroutine "C" acts like a separate server with its own
|
|
// connection to the reputation db. We expect never to see the
|
|
// interim dirty state; just before-the-changes and
|
|
// after-the-changes.
|
|
var (
|
|
queriesFromC int
|
|
queriesFromCSeeingDirty int
|
|
)
|
|
group.Go(func() error {
|
|
for {
|
|
queriesFromC++
|
|
info, err := reputationDB.Get(ctx, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
firstChangeSeen := (len(info.AuditHistory.Windows) > 0)
|
|
secondChangeSeen := (info.TotalAuditCount == positiveAudits)
|
|
if firstChangeSeen != secondChangeSeen {
|
|
queriesFromCSeeingDirty++
|
|
}
|
|
if secondChangeSeen == true {
|
|
// test is done.
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
|
|
// Goroutine "D" manages syncing for writecache.
|
|
group.Go(func() error {
|
|
err := writecache.Manage(managerCtx)
|
|
return errs2.IgnoreCanceled(err)
|
|
})
|
|
|
|
require.NoError(t, group.Wait())
|
|
require.Zero(t, queriesFromCSeeingDirty)
|
|
|
|
logger.Debug("round complete",
|
|
zap.Int("B queries", queriesFromB),
|
|
zap.Int("B queries observing dirty state", queriesFromBSeeingDirty),
|
|
zap.Int("C queries", queriesFromC))
|
|
}
|
|
})
|
|
}
|