storj/satellite/reputation/db_test.go
paul cannon 799b159bba satellite/reputation: offset write times by random, not by satelliteID
In an effort to distribute load on the reputation database, the
reputation write cache scheduled nodes to be written at a time offset by
the local nodeID. The idea was that no two repair workers would have the
same nodeID, so they would not tend to write to the same row at the same
time.

Instead, since all satellite processes share the same satellite ID
(duh), this caused _all_ workers to try and write to the same row at the
same time _always_. This was not ideal.

This change uses a random number instead of the satellite ID. The random
number is sourced from the number of nanoseconds since the Unix epoch.
As long as workers are not started at the exact same nanosecond, they
ought to get well-distributed offsets.

Change-Id: I149bdaa6ca1ee6043cfedcf1489dd9d3e3c7a163
2022-08-03 21:14:06 +00:00

375 lines
13 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/reputation"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestUpdate(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Reputation.AuditCount = 2
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
node.Contact.Chore.Pause(ctx)
db := planet.Satellites[0].DB.Reputation()
// 1 audit -> unvetted
updateReq := reputation.UpdateRequest{
NodeID: node.ID(),
AuditOutcome: reputation.AuditOffline,
Config: reputation.Config{
AuditCount: planet.Satellites[0].Config.Reputation.AuditCount,
AuditHistory: testAuditHistoryConfig(),
},
}
nodeStats, err := db.Update(ctx, updateReq, time.Now())
require.NoError(t, err)
assert.Nil(t, nodeStats.VettedAt)
// 2 audits -> vetted
updateReq.NodeID = node.ID()
updateReq.AuditOutcome = reputation.AuditOffline
nodeStats, err = db.Update(ctx, updateReq, time.Now())
require.NoError(t, err)
assert.NotNil(t, nodeStats.VettedAt)
// Don't overwrite node's vetted_at timestamp
updateReq.NodeID = node.ID()
updateReq.AuditOutcome = reputation.AuditSuccess
nodeStats2, err := db.Update(ctx, updateReq, time.Now())
require.NoError(t, err)
assert.NotNil(t, nodeStats2.VettedAt)
assert.Equal(t, nodeStats.VettedAt, nodeStats2.VettedAt)
})
}
// testApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call
// is equivalent to making multiple separate Update() calls (modulo some details
// like exact-time-of-disqualification).
func testApplyUpdatesEquivalentToMultipleUpdates(ctx context.Context, t *testing.T, reputationDB reputation.DB, config reputation.Config) {
for _, testDef := range []struct {
name string
failures int
successes int
offlines int
unknowns int
}{
{"4f-3s", 4, 3, 0, 0},
{"3s-3o", 0, 3, 3, 0},
{"4s-2u", 0, 4, 0, 2},
{"1f-4s-1o-3u", 1, 4, 1, 3},
{"4o", 4, 0, 0, 0},
{"5s", 0, 5, 0, 0},
{"6u", 0, 0, 0, 6},
} {
t.Run(testDef.name, func(t *testing.T) {
node1 := testrand.NodeID()
node2 := testrand.NodeID()
startTime := time.Now().Add(-time.Hour)
var (
info1, info2 *reputation.Info
err error
)
// Do the Update() calls first, on node1
updateReq := reputation.UpdateRequest{
NodeID: node1,
Config: config,
}
updateReq.AuditOutcome = reputation.AuditFailure
for i := 0; i < testDef.failures; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditOffline
for i := 0; i < testDef.offlines; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(10+i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditUnknown
for i := 0; i < testDef.unknowns; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(20+i)*time.Minute))
require.NoError(t, err)
}
updateReq.AuditOutcome = reputation.AuditSuccess
for i := 0; i < testDef.successes; i++ {
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(30+i)*time.Minute))
require.NoError(t, err)
}
// Now do the single ApplyUpdates call, on node2
var hist pb.AuditHistory
for i := 0; i < testDef.failures; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.offlines; i++ {
err = reputation.AddAuditToHistory(&hist, false, startTime.Add(time.Duration(10+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.unknowns; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(20+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
for i := 0; i < testDef.successes; i++ {
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(30+i)*time.Minute), config.AuditHistory)
require.NoError(t, err)
}
mutations := reputation.Mutations{
PositiveResults: testDef.successes,
FailureResults: testDef.failures,
UnknownResults: testDef.unknowns,
OfflineResults: testDef.offlines,
OnlineHistory: &hist,
}
info2, err = reputationDB.ApplyUpdates(ctx, node2, mutations, config, startTime.Add(40*time.Minute))
require.NoError(t, err)
require.NotNil(t, info1)
require.NotNil(t, info2)
require.Equalf(t, info1.VettedAt == nil, info2.VettedAt == nil,
"info1.VettedAt (%v) and info2.VettedAt (%v) should both be nil or both have values", info1.VettedAt, info2.VettedAt)
require.Equalf(t, info1.Disqualified == nil, info2.Disqualified == nil,
"info1.Disqualified (%v) and info2.Disqualified (%v) should both be nil or both have values", info1.Disqualified, info2.Disqualified)
require.InDelta(t, info1.AuditReputationAlpha, info2.AuditReputationAlpha, 1e-8)
require.InDelta(t, info1.AuditReputationBeta, info2.AuditReputationBeta, 1e-8)
require.InDelta(t, info1.UnknownAuditReputationAlpha, info2.UnknownAuditReputationAlpha, 1e-8)
require.InDelta(t, info1.UnknownAuditReputationBeta, info2.UnknownAuditReputationBeta, 1e-8)
require.InDelta(t, info1.OnlineScore, info2.OnlineScore, 1e-8)
require.InDelta(t, info1.AuditHistory.Score, info2.AuditHistory.Score, 1e-8)
require.NotNil(t, info1.AuditHistory)
require.NotNil(t, info2.AuditHistory)
require.Equal(t, info1.AuditHistory.Score, info2.AuditHistory.Score)
require.Equal(t, len(info1.AuditHistory.Windows), len(info2.AuditHistory.Windows),
"info1.AuditHistory.Windows (%v) and info2.AuditHistory.Windows (%v) should have the same length", info1.AuditHistory.Windows, info2.AuditHistory.Windows)
})
}
}
// TestApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call
// on db.Reputation() is equivalent to making multiple separate Update() calls
// (modulo some details like exact-time-of-disqualification).
func TestApplyUpdatesEquivalentToMultipleUpdates(t *testing.T) {
config := reputation.Config{
AuditLambda: 0.99,
AuditWeight: 1,
AuditDQ: 0.1,
SuspensionGracePeriod: 20 * time.Minute,
SuspensionDQEnabled: true,
AuditCount: 3,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: 1 * time.Hour,
GracePeriod: 20 * time.Minute,
OfflineThreshold: 0.5,
OfflineDQEnabled: false,
OfflineSuspensionEnabled: true,
},
}
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testApplyUpdatesEquivalentToMultipleUpdates(ctx, t, db.Reputation(), config)
})
}
// TestApplyUpdatesEquivalentToMultipleUpdatesCached checks that the ApplyUpdates
// call on a CachingDB is equivalent to making multiple separate Update() calls
// (modulo some details like exact-time-of-disqualification).
func TestApplyUpdatesEquivalentToMultipleUpdatesCached(t *testing.T) {
config := reputation.Config{
AuditLambda: 0.99,
AuditWeight: 1,
AuditDQ: 0.1,
SuspensionGracePeriod: 20 * time.Minute,
SuspensionDQEnabled: true,
AuditCount: 3,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: 1 * time.Hour,
GracePeriod: 20 * time.Minute,
OfflineThreshold: 0.5,
OfflineDQEnabled: false,
OfflineSuspensionEnabled: true,
},
}
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cachingDB := reputation.NewCachingDB(zaptest.NewLogger(t), db.Reputation(), config)
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
ctx.Go(func() error {
err := cachingDB.Manage(cancelCtx)
return errs2.IgnoreCanceled(err)
})
testApplyUpdatesEquivalentToMultipleUpdates(cancelCtx, t, cachingDB, config)
})
}
func TestDBDisqualifyNode(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()
nodeID := testrand.NodeID()
now := time.Now().Truncate(time.Second).UTC()
err := reputationDB.DisqualifyNode(ctx, nodeID, now, overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)
info, err := reputationDB.Get(ctx, nodeID)
require.NoError(t, err)
require.NotNil(t, info.Disqualified)
require.Equal(t, now, info.Disqualified.UTC())
require.Equal(t, overlay.DisqualificationReasonAuditFailure, info.DisqualificationReason)
})
}
func TestDBDisqualificationAuditFailure(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()
nodeID := testrand.NodeID()
now := time.Now()
updateReq := reputation.UpdateRequest{
NodeID: nodeID,
AuditOutcome: reputation.AuditFailure,
Config: reputation.Config{
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.99,
SuspensionGracePeriod: 0,
SuspensionDQEnabled: false,
AuditCount: 0,
AuditHistory: reputation.AuditHistoryConfig{},
},
}
status, err := reputationDB.Update(ctx, updateReq, now)
require.NoError(t, err)
require.NotNil(t, status.Disqualified)
assert.WithinDuration(t, now, *status.Disqualified, time.Microsecond)
assert.Equal(t, overlay.DisqualificationReasonAuditFailure, status.DisqualificationReason)
})
}
func TestDBDisqualificationSuspension(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()
nodeID := testrand.NodeID()
now := time.Now().Truncate(time.Second).UTC()
updateReq := reputation.UpdateRequest{
NodeID: nodeID,
AuditOutcome: reputation.AuditUnknown,
Config: reputation.Config{
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.99,
SuspensionGracePeriod: 0,
SuspensionDQEnabled: true,
AuditCount: 0,
AuditHistory: reputation.AuditHistoryConfig{},
},
}
// suspend node due to failed unknown audit
err := reputationDB.SuspendNodeUnknownAudit(ctx, nodeID, now.Add(-time.Second))
require.NoError(t, err)
// disqualify node after failed unknown audit when node is suspended
status, err := reputationDB.Update(ctx, updateReq, now)
require.NoError(t, err)
require.NotNil(t, status.Disqualified)
assert.Nil(t, status.UnknownAuditSuspended)
assert.Equal(t, now, status.Disqualified.UTC())
assert.Equal(t, overlay.DisqualificationReasonSuspension, status.DisqualificationReason)
})
}
func TestDBDisqualificationNodeOffline(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reputationDB := db.Reputation()
nodeID := testrand.NodeID()
now := time.Now().Truncate(time.Second).UTC()
updateReq := reputation.UpdateRequest{
NodeID: nodeID,
AuditOutcome: reputation.AuditOffline,
Config: reputation.Config{
AuditLambda: 0,
AuditWeight: 0,
AuditDQ: 0,
SuspensionGracePeriod: 0,
SuspensionDQEnabled: false,
AuditCount: 0,
AuditHistory: reputation.AuditHistoryConfig{
WindowSize: 0,
TrackingPeriod: 1 * time.Second,
GracePeriod: 0,
OfflineThreshold: 1,
OfflineDQEnabled: true,
OfflineSuspensionEnabled: true,
},
},
}
// first window always returns perfect score
_, err := reputationDB.Update(ctx, updateReq, now)
require.NoError(t, err)
// put node to offline suspension
suspendedAt := now.Add(time.Second)
status, err := reputationDB.Update(ctx, updateReq, suspendedAt)
require.NoError(t, err)
require.Equal(t, suspendedAt, status.OfflineSuspended.UTC())
// should have at least 2 windows in audit history after earliest window is removed
_, err = reputationDB.Update(ctx, updateReq, now.Add(2*time.Second))
require.NoError(t, err)
// disqualify node
disqualifiedAt := now.Add(3 * time.Second)
status, err = reputationDB.Update(ctx, updateReq, disqualifiedAt)
require.NoError(t, err)
require.NotNil(t, status.Disqualified)
assert.Equal(t, disqualifiedAt, status.Disqualified.UTC())
assert.Equal(t, overlay.DisqualificationReasonNodeOffline, status.DisqualificationReason)
})
}
func testAuditHistoryConfig() reputation.AuditHistoryConfig {
return reputation.AuditHistoryConfig{
WindowSize: time.Hour,
TrackingPeriod: time.Hour,
GracePeriod: time.Hour,
OfflineThreshold: 0,
}
}