storj/satellite/reputation/writecache_service_test.go
paul cannon 799b159bba satellite/reputation: offset write times by random, not by satelliteID
In an effort to distribute load on the reputation database, the
reputation write cache scheduled nodes to be written at a time offset by
the local nodeID. The idea was that no two repair workers would have the
same nodeID, so they would not tend to write to the same row at the same
time.

Instead, since all satellite processes share the same satellite ID
(duh), this caused _all_ workers to try and write to the same row at the
same time _always_. This was not ideal.

This change uses a random number instead of the satellite ID. The random
number is sourced from the number of nanoseconds since the Unix epoch.
As long as workers are not started at the exact same nanosecond, they
ought to get well-distributed offsets.

Change-Id: I149bdaa6ca1ee6043cfedcf1489dd9d3e3c7a163
2022-08-03 21:14:06 +00:00

291 lines
9.5 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/reputation"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestHeavyLockContention(t *testing.T) {
const (
nodeCount = 10
rounds = 200
windowSize = 10 * time.Minute
)
// construct random test data
timeStart := time.Now().Add(-rounds * windowSize)
nodes := make([]storj.NodeID, nodeCount)
testData := make([][]reputation.Mutations, nodeCount)
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
nodes[nodeNum] = testrand.NodeID()
testData[nodeNum] = make([]reputation.Mutations, rounds)
for roundNum := 0; roundNum < rounds; roundNum++ {
mutations := &testData[nodeNum][roundNum]
mutations.FailureResults = testrand.Intn(10)
mutations.OfflineResults = testrand.Intn(10)
mutations.UnknownResults = testrand.Intn(10)
mutations.PositiveResults = testrand.Intn(10)
mutations.OnlineHistory = &pb.AuditHistory{
Windows: []*pb.AuditWindow{
{
WindowStart: timeStart.Add(windowSize * time.Duration(roundNum)),
OnlineCount: int32(mutations.FailureResults + mutations.UnknownResults + mutations.PositiveResults),
TotalCount: int32(mutations.OfflineResults),
},
},
}
}
}
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
config := reputation.Config{
AuditLambda: 1,
AuditWeight: 1,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: windowSize,
TrackingPeriod: 1 * time.Hour,
},
FlushInterval: 2 * time.Hour,
ErrorRetryInterval: 0,
}
reputationDB := db.Reputation()
writecacheDB := reputation.NewCachingDB(zaptest.NewLogger(t), reputationDB, config)
var group errgroup.Group
// Make room for results ahead of time, so we don't need to use any
// extra locks or atomics while the goroutines are goroutine-ing. Extra
// synchronization primitives might synchronize things that otherwise
// wouldn't have been synchronized.
resultInfos := make([][]*reputation.Info, nodeCount)
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
nodeNum := nodeNum
group.Go(func() error {
resultInfos[nodeNum] = make([]*reputation.Info, rounds)
for roundNum := 0; roundNum < rounds; roundNum++ {
now := testData[nodeNum][roundNum].OnlineHistory.Windows[0].WindowStart
now = now.Add(time.Duration(testrand.Int63n(int64(windowSize))))
info, err := writecacheDB.ApplyUpdates(ctx, nodes[nodeNum], testData[nodeNum][roundNum], config, now)
if err != nil {
return fmt.Errorf("node[%d] in round[%d]: %w", nodeNum, roundNum, err)
}
resultInfos[nodeNum][roundNum] = info.Copy()
}
return nil
})
}
err := group.Wait()
require.NoError(t, err)
// verify each step along the way had the expected running totals
for nodeNum := 0; nodeNum < nodeCount; nodeNum++ {
totalAudits := 0
totalSuccess := 0
for roundNum := 0; roundNum < rounds; roundNum++ {
input := testData[nodeNum][roundNum]
output := resultInfos[nodeNum][roundNum]
totalAudits += input.UnknownResults
totalAudits += input.OfflineResults
totalAudits += input.FailureResults
totalAudits += input.PositiveResults
totalSuccess += input.PositiveResults
require.NotNil(t, output)
require.Equalf(t, int64(totalAudits), output.TotalAuditCount,
"node[%d] in round[%d]: expected %d, but TotalAuditCount=%d", nodeNum, roundNum, totalAudits, output.TotalAuditCount)
require.Equal(t, int64(totalSuccess), output.AuditSuccessCount)
expectLen := roundNum + 1
if windowSize*time.Duration(roundNum) > config.AuditHistory.TrackingPeriod {
expectLen = int(config.AuditHistory.TrackingPeriod/windowSize) + 1
}
require.Lenf(t, output.AuditHistory.Windows, expectLen,
"node[%d] in round[%d]", nodeNum, roundNum)
require.Equal(t, input.OnlineHistory.Windows[0].OnlineCount, output.AuditHistory.Windows[expectLen-1].OnlineCount)
require.Equal(t, input.OnlineHistory.Windows[0].TotalCount, output.AuditHistory.Windows[expectLen-1].TotalCount)
require.Equal(t, input.OnlineHistory.Windows[0].WindowStart.UTC().Round(0), output.AuditHistory.Windows[expectLen-1].WindowStart.UTC().Round(0))
}
}
})
}
func TestFetchingInfoWhileEntryIsSyncing(t *testing.T) {
const (
windowSize = 10 * time.Minute
numRounds = 20
)
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
config := reputation.Config{
AuditLambda: 1,
AuditWeight: 1,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: windowSize,
TrackingPeriod: 1 * time.Hour,
},
FlushInterval: 2 * time.Hour,
ErrorRetryInterval: 0,
}
logger := zaptest.NewLogger(t)
reputationDB := db.Reputation()
writecache := reputation.NewCachingDB(logger.Named("writecache"), reputationDB, config)
const positiveAudits = 123
for i := 0; i < numRounds; i++ {
group, ctx := errgroup.WithContext(ctx)
startTime := time.Now()
managerCtx, managerCancel := context.WithCancel(ctx)
nodeID := testrand.NodeID()
// get node into the cache
_, err := writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{}, config, startTime)
require.NoError(t, err)
// The best we can do without an attached debugger framework is
// to try and time these calls at about the same time, and hope
// that everything doesn't all happen in the same order in every
// round. We'd like to test things happening in a few different
// orders.
// In this goroutine "A", we apply two different sets of updates,
// one after the other. Then we request a sync to the database.
group.Go(func() error {
// when A is done, let manager 1 know it can quit too
defer managerCancel()
info, err := writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{
OnlineHistory: &pb.AuditHistory{
Windows: []*pb.AuditWindow{
{
WindowStart: startTime.Round(windowSize),
OnlineCount: 0,
TotalCount: 10,
},
},
Score: 0,
},
}, config, startTime.Round(windowSize).Add(time.Second))
if err != nil {
return err
}
// the mutation should be visible now
if len(info.AuditHistory.Windows) != 1 {
return fmt.Errorf("assertion error: windows slice is %v (expected len 1)", info.AuditHistory.Windows)
}
// wait (very) briefly and make a second change. other satellite worker
// processes (such as goroutine "C") should not ever see the above change
// without this one (otherwise, that means we're writing everything to
// the database instead of caching writes).
time.Sleep(10 * time.Millisecond)
info, err = writecache.ApplyUpdates(ctx, nodeID, reputation.Mutations{
PositiveResults: positiveAudits,
}, config, startTime.Round(windowSize).Add(2*time.Second))
if err != nil {
return err
}
if info.TotalAuditCount != positiveAudits {
return fmt.Errorf("assertion error: TotalAuditCount is %d (expected %d)", info.TotalAuditCount, positiveAudits)
}
// now trigger a flush
err = writecache.RequestSync(ctx, nodeID)
if err != nil {
return err
}
return nil
})
// In this goroutine "B", we repeatedly request the info for the
// node from the cache, until we see that the changes from "A"
// have been made. We count the number of results we get in the
// interim state (the audit history has been extended, but the
// positive audits have not been recorded) just to get an idea
// of how effective this test is.
var (
queriesFromB int
queriesFromBSeeingDirty int
)
group.Go(func() error {
for {
queriesFromB++
info, err := writecache.Get(ctx, nodeID)
if err != nil {
return err
}
firstChangeSeen := (len(info.AuditHistory.Windows) > 0)
secondChangeSeen := (info.TotalAuditCount == positiveAudits)
if firstChangeSeen != secondChangeSeen {
queriesFromBSeeingDirty++
}
if secondChangeSeen == true {
// test is done.
return nil
}
}
})
// And goroutine "C" acts like a separate server with its own
// connection to the reputation db. We expect never to see the
// interim dirty state; just before-the-changes and
// after-the-changes.
var (
queriesFromC int
queriesFromCSeeingDirty int
)
group.Go(func() error {
for {
queriesFromC++
info, err := reputationDB.Get(ctx, nodeID)
if err != nil {
return err
}
firstChangeSeen := (len(info.AuditHistory.Windows) > 0)
secondChangeSeen := (info.TotalAuditCount == positiveAudits)
if firstChangeSeen != secondChangeSeen {
queriesFromCSeeingDirty++
}
if secondChangeSeen == true {
// test is done.
return nil
}
}
})
// Goroutine "D" manages syncing for writecache.
group.Go(func() error {
err := writecache.Manage(managerCtx)
return errs2.IgnoreCanceled(err)
})
require.NoError(t, group.Wait())
require.Zero(t, queriesFromCSeeingDirty)
logger.Debug("round complete",
zap.Int("B queries", queriesFromB),
zap.Int("B queries observing dirty state", queriesFromBSeeingDirty),
zap.Int("C queries", queriesFromC))
}
})
}