2f20bbf4d8
This should lower the amount of database load coming from reputation updates. Change-Id: Iaacfb81480075261da77c5cc93e08b24f69f8949
375 lines
13 KiB
Go
375 lines
13 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package reputation_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/reputation"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestUpdate(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Reputation.AuditCount = 2
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
node := planet.StorageNodes[0]
|
|
node.Contact.Chore.Pause(ctx)
|
|
|
|
db := planet.Satellites[0].DB.Reputation()
|
|
|
|
// 1 audit -> unvetted
|
|
updateReq := reputation.UpdateRequest{
|
|
NodeID: node.ID(),
|
|
AuditOutcome: reputation.AuditOffline,
|
|
Config: reputation.Config{
|
|
AuditCount: planet.Satellites[0].Config.Reputation.AuditCount,
|
|
AuditHistory: testAuditHistoryConfig(),
|
|
},
|
|
}
|
|
nodeStats, err := db.Update(ctx, updateReq, time.Now())
|
|
require.NoError(t, err)
|
|
assert.Nil(t, nodeStats.VettedAt)
|
|
|
|
// 2 audits -> vetted
|
|
updateReq.NodeID = node.ID()
|
|
updateReq.AuditOutcome = reputation.AuditOffline
|
|
nodeStats, err = db.Update(ctx, updateReq, time.Now())
|
|
require.NoError(t, err)
|
|
assert.NotNil(t, nodeStats.VettedAt)
|
|
|
|
// Don't overwrite node's vetted_at timestamp
|
|
updateReq.NodeID = node.ID()
|
|
updateReq.AuditOutcome = reputation.AuditSuccess
|
|
nodeStats2, err := db.Update(ctx, updateReq, time.Now())
|
|
require.NoError(t, err)
|
|
assert.NotNil(t, nodeStats2.VettedAt)
|
|
assert.Equal(t, nodeStats.VettedAt, nodeStats2.VettedAt)
|
|
|
|
})
|
|
}
|
|
|
|
// testApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call
|
|
// is equivalent to making multiple separate Update() calls (modulo some details
|
|
// like exact-time-of-disqualification).
|
|
func testApplyUpdatesEquivalentToMultipleUpdates(ctx context.Context, t *testing.T, reputationDB reputation.DB, config reputation.Config) {
|
|
for _, testDef := range []struct {
|
|
name string
|
|
failures int
|
|
successes int
|
|
offlines int
|
|
unknowns int
|
|
}{
|
|
{"4f-3s", 4, 3, 0, 0},
|
|
{"3s-3o", 0, 3, 3, 0},
|
|
{"4s-2u", 0, 4, 0, 2},
|
|
{"1f-4s-1o-3u", 1, 4, 1, 3},
|
|
{"4o", 4, 0, 0, 0},
|
|
{"5s", 0, 5, 0, 0},
|
|
{"6u", 0, 0, 0, 6},
|
|
} {
|
|
t.Run(testDef.name, func(t *testing.T) {
|
|
node1 := testrand.NodeID()
|
|
node2 := testrand.NodeID()
|
|
startTime := time.Now().Add(-time.Hour)
|
|
var (
|
|
info1, info2 *reputation.Info
|
|
err error
|
|
)
|
|
|
|
// Do the Update() calls first, on node1
|
|
|
|
updateReq := reputation.UpdateRequest{
|
|
NodeID: node1,
|
|
Config: config,
|
|
}
|
|
|
|
updateReq.AuditOutcome = reputation.AuditFailure
|
|
for i := 0; i < testDef.failures; i++ {
|
|
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(i)*time.Minute))
|
|
require.NoError(t, err)
|
|
}
|
|
updateReq.AuditOutcome = reputation.AuditOffline
|
|
for i := 0; i < testDef.offlines; i++ {
|
|
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(10+i)*time.Minute))
|
|
require.NoError(t, err)
|
|
}
|
|
updateReq.AuditOutcome = reputation.AuditUnknown
|
|
for i := 0; i < testDef.unknowns; i++ {
|
|
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(20+i)*time.Minute))
|
|
require.NoError(t, err)
|
|
}
|
|
updateReq.AuditOutcome = reputation.AuditSuccess
|
|
for i := 0; i < testDef.successes; i++ {
|
|
info1, err = reputationDB.Update(ctx, updateReq, startTime.Add(time.Duration(30+i)*time.Minute))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Now do the single ApplyUpdates call, on node2
|
|
|
|
var hist pb.AuditHistory
|
|
for i := 0; i < testDef.failures; i++ {
|
|
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(i)*time.Minute), config.AuditHistory)
|
|
require.NoError(t, err)
|
|
}
|
|
for i := 0; i < testDef.offlines; i++ {
|
|
err = reputation.AddAuditToHistory(&hist, false, startTime.Add(time.Duration(10+i)*time.Minute), config.AuditHistory)
|
|
require.NoError(t, err)
|
|
}
|
|
for i := 0; i < testDef.unknowns; i++ {
|
|
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(20+i)*time.Minute), config.AuditHistory)
|
|
require.NoError(t, err)
|
|
}
|
|
for i := 0; i < testDef.successes; i++ {
|
|
err = reputation.AddAuditToHistory(&hist, true, startTime.Add(time.Duration(30+i)*time.Minute), config.AuditHistory)
|
|
require.NoError(t, err)
|
|
}
|
|
mutations := reputation.Mutations{
|
|
PositiveResults: testDef.successes,
|
|
FailureResults: testDef.failures,
|
|
UnknownResults: testDef.unknowns,
|
|
OfflineResults: testDef.offlines,
|
|
OnlineHistory: &hist,
|
|
}
|
|
info2, err = reputationDB.ApplyUpdates(ctx, node2, mutations, config, startTime.Add(40*time.Minute))
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, info1)
|
|
require.NotNil(t, info2)
|
|
require.Equalf(t, info1.VettedAt == nil, info2.VettedAt == nil,
|
|
"info1.VettedAt (%v) and info2.VettedAt (%v) should both be nil or both have values", info1.VettedAt, info2.VettedAt)
|
|
require.Equalf(t, info1.Disqualified == nil, info2.Disqualified == nil,
|
|
"info1.Disqualified (%v) and info2.Disqualified (%v) should both be nil or both have values", info1.Disqualified, info2.Disqualified)
|
|
require.InDelta(t, info1.AuditReputationAlpha, info2.AuditReputationAlpha, 1e-8)
|
|
require.InDelta(t, info1.AuditReputationBeta, info2.AuditReputationBeta, 1e-8)
|
|
require.InDelta(t, info1.UnknownAuditReputationAlpha, info2.UnknownAuditReputationAlpha, 1e-8)
|
|
require.InDelta(t, info1.UnknownAuditReputationBeta, info2.UnknownAuditReputationBeta, 1e-8)
|
|
require.InDelta(t, info1.OnlineScore, info2.OnlineScore, 1e-8)
|
|
require.InDelta(t, info1.AuditHistory.Score, info2.AuditHistory.Score, 1e-8)
|
|
require.NotNil(t, info1.AuditHistory)
|
|
require.NotNil(t, info2.AuditHistory)
|
|
require.Equal(t, info1.AuditHistory.Score, info2.AuditHistory.Score)
|
|
require.Equal(t, len(info1.AuditHistory.Windows), len(info2.AuditHistory.Windows),
|
|
"info1.AuditHistory.Windows (%v) and info2.AuditHistory.Windows (%v) should have the same length", info1.AuditHistory.Windows, info2.AuditHistory.Windows)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestApplyUpdatesEquivalentToMultipleUpdates checks that the ApplyUpdates call
|
|
// on db.Reputation() is equivalent to making multiple separate Update() calls
|
|
// (modulo some details like exact-time-of-disqualification).
|
|
func TestApplyUpdatesEquivalentToMultipleUpdates(t *testing.T) {
|
|
config := reputation.Config{
|
|
AuditLambda: 0.99,
|
|
AuditWeight: 1,
|
|
AuditDQ: 0.1,
|
|
SuspensionGracePeriod: 20 * time.Minute,
|
|
SuspensionDQEnabled: true,
|
|
AuditCount: 3,
|
|
AuditHistory: reputation.AuditHistoryConfig{
|
|
WindowSize: 10 * time.Minute,
|
|
TrackingPeriod: 1 * time.Hour,
|
|
GracePeriod: 20 * time.Minute,
|
|
OfflineThreshold: 0.5,
|
|
OfflineDQEnabled: false,
|
|
OfflineSuspensionEnabled: true,
|
|
},
|
|
}
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
testApplyUpdatesEquivalentToMultipleUpdates(ctx, t, db.Reputation(), config)
|
|
})
|
|
}
|
|
|
|
// TestApplyUpdatesEquivalentToMultipleUpdatesCached checks that the ApplyUpdates
|
|
// call on a CachingDB is equivalent to making multiple separate Update() calls
|
|
// (modulo some details like exact-time-of-disqualification).
|
|
func TestApplyUpdatesEquivalentToMultipleUpdatesCached(t *testing.T) {
|
|
config := reputation.Config{
|
|
AuditLambda: 0.99,
|
|
AuditWeight: 1,
|
|
AuditDQ: 0.1,
|
|
SuspensionGracePeriod: 20 * time.Minute,
|
|
SuspensionDQEnabled: true,
|
|
AuditCount: 3,
|
|
AuditHistory: reputation.AuditHistoryConfig{
|
|
WindowSize: 10 * time.Minute,
|
|
TrackingPeriod: 1 * time.Hour,
|
|
GracePeriod: 20 * time.Minute,
|
|
OfflineThreshold: 0.5,
|
|
OfflineDQEnabled: false,
|
|
OfflineSuspensionEnabled: true,
|
|
},
|
|
}
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cachingDB := reputation.NewCachingDB(zaptest.NewLogger(t), testrand.NodeID(), db.Reputation(), config)
|
|
cancelCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
ctx.Go(func() error {
|
|
err := cachingDB.Manage(cancelCtx)
|
|
return errs2.IgnoreCanceled(err)
|
|
})
|
|
testApplyUpdatesEquivalentToMultipleUpdates(cancelCtx, t, cachingDB, config)
|
|
})
|
|
}
|
|
|
|
func TestDBDisqualifyNode(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
reputationDB := db.Reputation()
|
|
nodeID := testrand.NodeID()
|
|
now := time.Now().Truncate(time.Second).UTC()
|
|
|
|
err := reputationDB.DisqualifyNode(ctx, nodeID, now, overlay.DisqualificationReasonAuditFailure)
|
|
require.NoError(t, err)
|
|
|
|
info, err := reputationDB.Get(ctx, nodeID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, info.Disqualified)
|
|
require.Equal(t, now, info.Disqualified.UTC())
|
|
require.Equal(t, overlay.DisqualificationReasonAuditFailure, info.DisqualificationReason)
|
|
})
|
|
}
|
|
|
|
func TestDBDisqualificationAuditFailure(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
reputationDB := db.Reputation()
|
|
nodeID := testrand.NodeID()
|
|
now := time.Now()
|
|
|
|
updateReq := reputation.UpdateRequest{
|
|
NodeID: nodeID,
|
|
AuditOutcome: reputation.AuditFailure,
|
|
Config: reputation.Config{
|
|
AuditLambda: 1,
|
|
AuditWeight: 1,
|
|
AuditDQ: 0.99,
|
|
SuspensionGracePeriod: 0,
|
|
SuspensionDQEnabled: false,
|
|
AuditCount: 0,
|
|
AuditHistory: reputation.AuditHistoryConfig{},
|
|
},
|
|
}
|
|
|
|
status, err := reputationDB.Update(ctx, updateReq, now)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, status.Disqualified)
|
|
assert.WithinDuration(t, now, *status.Disqualified, time.Microsecond)
|
|
assert.Equal(t, overlay.DisqualificationReasonAuditFailure, status.DisqualificationReason)
|
|
})
|
|
}
|
|
|
|
func TestDBDisqualificationSuspension(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
reputationDB := db.Reputation()
|
|
nodeID := testrand.NodeID()
|
|
now := time.Now().Truncate(time.Second).UTC()
|
|
|
|
updateReq := reputation.UpdateRequest{
|
|
NodeID: nodeID,
|
|
AuditOutcome: reputation.AuditUnknown,
|
|
Config: reputation.Config{
|
|
AuditLambda: 1,
|
|
AuditWeight: 1,
|
|
AuditDQ: 0.99,
|
|
SuspensionGracePeriod: 0,
|
|
SuspensionDQEnabled: true,
|
|
AuditCount: 0,
|
|
AuditHistory: reputation.AuditHistoryConfig{},
|
|
},
|
|
}
|
|
|
|
// suspend node due to failed unknown audit
|
|
err := reputationDB.SuspendNodeUnknownAudit(ctx, nodeID, now.Add(-time.Second))
|
|
require.NoError(t, err)
|
|
|
|
// disqualify node after failed unknown audit when node is suspended
|
|
status, err := reputationDB.Update(ctx, updateReq, now)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, status.Disqualified)
|
|
assert.Nil(t, status.UnknownAuditSuspended)
|
|
assert.Equal(t, now, status.Disqualified.UTC())
|
|
assert.Equal(t, overlay.DisqualificationReasonSuspension, status.DisqualificationReason)
|
|
})
|
|
}
|
|
|
|
func TestDBDisqualificationNodeOffline(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
reputationDB := db.Reputation()
|
|
nodeID := testrand.NodeID()
|
|
now := time.Now().Truncate(time.Second).UTC()
|
|
|
|
updateReq := reputation.UpdateRequest{
|
|
NodeID: nodeID,
|
|
AuditOutcome: reputation.AuditOffline,
|
|
Config: reputation.Config{
|
|
AuditLambda: 0,
|
|
AuditWeight: 0,
|
|
AuditDQ: 0,
|
|
SuspensionGracePeriod: 0,
|
|
SuspensionDQEnabled: false,
|
|
AuditCount: 0,
|
|
AuditHistory: reputation.AuditHistoryConfig{
|
|
WindowSize: 0,
|
|
TrackingPeriod: 1 * time.Second,
|
|
GracePeriod: 0,
|
|
OfflineThreshold: 1,
|
|
OfflineDQEnabled: true,
|
|
OfflineSuspensionEnabled: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
// first window always returns perfect score
|
|
_, err := reputationDB.Update(ctx, updateReq, now)
|
|
require.NoError(t, err)
|
|
|
|
// put node to offline suspension
|
|
suspendedAt := now.Add(time.Second)
|
|
status, err := reputationDB.Update(ctx, updateReq, suspendedAt)
|
|
require.NoError(t, err)
|
|
require.Equal(t, suspendedAt, status.OfflineSuspended.UTC())
|
|
|
|
// should have at least 2 windows in audit history after earliest window is removed
|
|
_, err = reputationDB.Update(ctx, updateReq, now.Add(2*time.Second))
|
|
require.NoError(t, err)
|
|
|
|
// disqualify node
|
|
disqualifiedAt := now.Add(3 * time.Second)
|
|
status, err = reputationDB.Update(ctx, updateReq, disqualifiedAt)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, status.Disqualified)
|
|
assert.Equal(t, disqualifiedAt, status.Disqualified.UTC())
|
|
assert.Equal(t, overlay.DisqualificationReasonNodeOffline, status.DisqualificationReason)
|
|
})
|
|
}
|
|
|
|
func testAuditHistoryConfig() reputation.AuditHistoryConfig {
|
|
return reputation.AuditHistoryConfig{
|
|
WindowSize: time.Hour,
|
|
TrackingPeriod: time.Hour,
|
|
GracePeriod: time.Hour,
|
|
OfflineThreshold: 0,
|
|
}
|
|
}
|