satellite/satellitedb: uses vetted_at field to select for reputable nodes

Additionally, this PR changes NewNodeFraction devDefault and testplanet config from 0.05 to 1.
This is because many tests relied on selecting nodes that were reputable based on audit and uptime
counts of 0, in effect, selecting new nodes as reputable ones.
However, since reputation is now indicated by a vetted_at db field that is explicitly set
rather than implied by audit and uptime counts, it would be more complicated to try to
update all of the nodes' reputations before selecting nodes for tests.
Now we just allow all test nodes to be new if needed.

Change-Id: Ib9531be77408662315b948fd029cee925ed2ca1d
This commit is contained in:
Jennifer Johnson 2020-07-08 10:28:49 -04:00 committed by Jennifer Li Johnson
parent cccd62fff9
commit 4e2413a99d
11 changed files with 261 additions and 258 deletions

View File

@ -397,7 +397,7 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
Node: overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: 0,
NewNodeFraction: 0,
NewNodeFraction: 1,
OnlineWindow: time.Minute,
DistinctIP: false,
MinimumDiskSpace: 100 * memory.MB,

View File

@ -63,9 +63,7 @@ func BenchmarkOverlay(b *testing.B) {
b.Run("KnownUnreliableOrOffline", func(b *testing.B) {
criteria := &overlay.NodeCriteria{
AuditCount: 0,
OnlineWindow: 1000 * time.Hour,
UptimeCount: 0,
}
for i := 0; i < b.N; i++ {
badNodes, err := overlaydb.KnownUnreliableOrOffline(ctx, criteria, check)
@ -310,8 +308,6 @@ func BenchmarkNodeSelection(b *testing.B) {
criteria := &overlay.NodeCriteria{
FreeDisk: 0,
AuditCount: 1,
UptimeCount: 0,
ExcludedIDs: nil,
ExcludedNetworks: nil,
MinimumVersion: "v1.0.0",
@ -320,8 +316,6 @@ func BenchmarkNodeSelection(b *testing.B) {
}
excludedCriteria := &overlay.NodeCriteria{
FreeDisk: 0,
AuditCount: 1,
UptimeCount: 0,
ExcludedIDs: excludedIDs,
ExcludedNetworks: excludedNets,
MinimumVersion: "v1.0.0",

View File

@ -31,7 +31,7 @@ type Config struct {
type NodeSelectionConfig struct {
UptimeCount int64 `help:"the number of times a node's uptime has been checked to not be considered a New Node" releaseDefault:"100" devDefault:"0"`
AuditCount int64 `help:"the number of times a node has been audited to not be considered a New Node" releaseDefault:"100" devDefault:"0"`
NewNodeFraction float64 `help:"the fraction of new nodes allowed per request" default:"0.05"`
NewNodeFraction float64 `help:"the fraction of new nodes allowed per request" releaseDefault:"0.05" devDefault:"1"`
MinimumVersion string `help:"the minimum node software version for node selection queries" default:""`
OnlineWindow time.Duration `help:"the amount of time without seeing a node before its considered offline" default:"4h"`
DistinctIP bool `help:"require distinct IPs when choosing nodes for upload" releaseDefault:"true" devDefault:"false"`

View File

@ -63,7 +63,7 @@ func TestRefresh(t *testing.T) {
// add some nodes to the database
const nodeCount = 2
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, false)
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 0)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
@ -74,8 +74,7 @@ func TestRefresh(t *testing.T) {
})
}
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count int, makeReputable bool) []storj.NodeID {
var reputableIds = []storj.NodeID{}
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count, makeReputable int) (reputableIds []storj.NodeID) {
for i := 0; i < count; i++ {
subnet := strconv.Itoa(i) + ".1.2"
addr := subnet + ".3:8080"
@ -89,8 +88,7 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
LastIPPort: addr,
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeDisk: 200 * memory.MiB.Int64(),
FreeBandwidth: 1 * memory.TB.Int64(),
FreeDisk: 200 * memory.MiB.Int64(),
},
Version: &pb.NodeVersion{
Version: "v1.1.0",
@ -102,9 +100,9 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeSelectionConfig)
require.NoError(t, err)
// make half of the nodes reputable
if makeReputable && i > count/2 {
_, err = db.UpdateStats(ctx, &overlay.UpdateRequest{
// make designated nodes reputable
if i < makeReputable {
stats, err := db.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: storj.NodeID{byte(i)},
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
@ -112,6 +110,7 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
AuditHistory: testAuditHistoryConfig(),
}, time.Now())
require.NoError(t, err)
require.NotNil(t, stats.VettedAt)
reputableIds = append(reputableIds, storj.NodeID{byte(i)})
}
}
@ -191,8 +190,6 @@ func TestRefreshConcurrent(t *testing.T) {
func TestGetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 0,
UptimeCount: 0,
NewNodeFraction: 0.2,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
@ -209,16 +206,17 @@ func TestGetNodes(t *testing.T) {
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database
// add 4 nodes to the database and vet 2
const nodeCount = 4
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, false)
nodeIds := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 2)
require.Len(t, nodeIds, 2)
// confirm cache.GetNodes returns the correct nodes
selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 0, new)
require.Equal(t, 4, reputable)
require.Equal(t, 2, new)
require.Equal(t, 2, reputable)
require.Equal(t, 2, len(selectedNodes))
for _, node := range selectedNodes {
require.NotEqual(t, node.ID, "")
@ -469,8 +467,8 @@ func TestNewNodeFraction(t *testing.T) {
// add some nodes to the database, some are reputable and some are new nodes
const nodeCount = 10
repIDs := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, true)
repIDs := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 4)
require.Len(t, repIDs, 4)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
@ -490,6 +488,6 @@ func TestNewNodeFraction(t *testing.T) {
}
}
}
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction))
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction)) // 1, 1
})
}

View File

@ -6,7 +6,6 @@ package overlay_test
import (
"crypto/tls"
"crypto/x509"
"errors"
"net"
"runtime"
"strings"
@ -200,7 +199,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("request 5, where 1 new", func(t *testing.T) {
requestedCount, newCount := 5, 1
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction, false)
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
@ -213,7 +212,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("request 5, all new", func(t *testing.T) {
requestedCount, newCount := 5, 5
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction, false)
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
@ -244,8 +243,8 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("no new nodes", func(t *testing.T) {
requestedCount, newCount := 5, 1.0
newNodeFraction := newCount / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
satellite.Config.Overlay.Node = testNodeSelectionConfig(1, newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction, false)
satellite.Config.Overlay.Node = testNodeSelectionConfig(newNodeFraction, false)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
@ -260,192 +259,128 @@ func TestEnsureMinimumRequested(t *testing.T) {
func TestNodeSelection(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// This sets audit counts of 0, 1, 2, 3, ... 9
// so that we can fine-tune how many nodes are considered new or reputable
// by modifying the audit count cutoff passed into FindStorageNodesWithPreferences
for i, node := range planet.StorageNodes {
for k := 0; k < i; k++ {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: node.ID(),
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
AuditHistory: testAuditHistoryConfig(),
}, time.Now())
require.NoError(t, err)
}
}
testNodeSelection(t, ctx, planet)
})
}
func TestNodeSelectionWithBatch(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.UpdateStatsBatchSize = 1
config.Overlay.AuditHistory = testAuditHistoryConfig()
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
errNotEnoughNodes := &overlay.ErrNotEnoughNodes
tests := []struct {
description string
requestCount int
newNodeFraction float64
reputableNodes int
expectedCount int
shouldFailWith *errs.Class
exclude func() (excludedNodes []storj.NodeID)
}{
{
description: "all reputable nodes, only reputable nodes requested",
requestCount: 6,
newNodeFraction: 0,
reputableNodes: 6,
expectedCount: 6,
},
{
description: "all reputable nodes, up to 100% new nodes requested",
requestCount: 5,
newNodeFraction: 1,
reputableNodes: 6,
expectedCount: 5,
},
{
description: "3 reputable and 3 new nodes, 6 reputable nodes requested, not enough reputable nodes",
requestCount: 6,
newNodeFraction: 0,
reputableNodes: 3,
expectedCount: 3,
shouldFailWith: errNotEnoughNodes,
},
{
description: "50-50 reputable and new nodes, reputable and new nodes requested, not enough reputable nodes",
requestCount: 5,
newNodeFraction: 0.2,
reputableNodes: 3,
expectedCount: 4,
shouldFailWith: errNotEnoughNodes,
},
{
description: "all new nodes except one, reputable and new nodes requested (happy path)",
requestCount: 2,
newNodeFraction: 0.5,
reputableNodes: 1,
expectedCount: 2,
},
{
description: "all new nodes except one, reputable and new nodes requested (not happy path)",
requestCount: 4,
newNodeFraction: 0.5,
reputableNodes: 1,
expectedCount: 3,
shouldFailWith: errNotEnoughNodes,
},
{
description: "all new nodes, reputable and new nodes requested",
requestCount: 6,
newNodeFraction: 1,
reputableNodes: 0,
expectedCount: 6,
},
{
description: "excluded node ids",
requestCount: 6,
newNodeFraction: 0,
reputableNodes: 6,
expectedCount: 1,
shouldFailWith: errNotEnoughNodes,
exclude: func() (excludedNodes []storj.NodeID) {
for _, storageNode := range planet.StorageNodes[:5] {
excludedNodes = append(excludedNodes, storageNode.ID())
}
return excludedNodes
},
},
}
// This sets audit counts of 0, 1, 2, 3, ... 9
// so that we can fine-tune how many nodes are considered new or reputable
// by modifying the audit count cutoff passed into FindStorageNodesWithPreferences
for i, node := range planet.StorageNodes {
for k := 0; k < i; k++ {
// These are done individually b/c the previous stat data is important
_, err := satellite.Overlay.Service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: node.ID(),
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
}})
for _, tt := range tests {
t.Log(tt.description)
var excludedNodes []storj.NodeID
if tt.exclude != nil {
excludedNodes = tt.exclude()
}
for i, node := range planet.StorageNodes {
if i < tt.reputableNodes {
_, err := satellite.Overlay.Service.TestVetNode(ctx, node.ID())
require.NoError(t, err)
} else {
err := satellite.Overlay.Service.TestUnvetNode(ctx, node.ID())
require.NoError(t, err)
}
}
config := testNodeSelectionConfig(tt.newNodeFraction, false)
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{RequestedCount: tt.requestCount, ExcludedIDs: excludedNodes}, &config)
if tt.shouldFailWith != nil {
require.Error(t, err)
assert.True(t, tt.shouldFailWith.Has(err))
} else {
require.NoError(t, err)
}
if len(excludedNodes) > 0 {
for _, n := range response {
for _, m := range excludedNodes {
require.NotEqual(t, n.ID, m)
}
}
}
require.Equal(t, tt.expectedCount, len(response))
}
testNodeSelection(t, ctx, planet)
})
}
func testNodeSelection(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// ensure all storagenodes are in overlay
for _, storageNode := range planet.StorageNodes {
n := storageNode.Contact.Service.Local()
lastNet, err := ipToLastNet(n.Address)
require.NoError(t, err)
d := overlay.NodeCheckInInfo{
NodeID: storageNode.ID(),
Address: &pb.NodeAddress{
Address: n.Address,
},
LastIPPort: storageNode.Addr(),
LastNet: lastNet,
Version: &n.Version,
}
err = satellite.Overlay.DB.UpdateCheckIn(ctx, d, time.Now().UTC(), satellite.Config.Overlay.Node)
require.NoError(t, err)
}
type test struct {
Preferences overlay.NodeSelectionConfig
ExcludeCount int
RequestCount int
ExpectedCount int
ShouldFailWith *errs.Class
}
for i, tt := range []test{
{ // all reputable nodes, only reputable nodes requested
Preferences: testNodeSelectionConfig(0, 0, false),
RequestCount: 5,
ExpectedCount: 5,
},
{ // all reputable nodes, reputable and new nodes requested
Preferences: testNodeSelectionConfig(0, 1, false),
RequestCount: 5,
ExpectedCount: 5,
},
{ // 50-50 reputable and new nodes, not enough reputable nodes
Preferences: testNodeSelectionConfig(5, 0, false),
RequestCount: 10,
ExpectedCount: 5,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // 50-50 reputable and new nodes, reputable and new nodes requested, not enough reputable nodes
Preferences: testNodeSelectionConfig(5, 0.2, false),
RequestCount: 10,
ExpectedCount: 7,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all new nodes except one, reputable and new nodes requested (happy path)
Preferences: testNodeSelectionConfig(9, 0.5, false),
RequestCount: 2,
ExpectedCount: 2,
},
{ // all new nodes except one, reputable and new nodes requested (not happy path)
Preferences: testNodeSelectionConfig(9, 0.5, false),
RequestCount: 4,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all new nodes, reputable and new nodes requested
Preferences: testNodeSelectionConfig(50, 1, false),
RequestCount: 2,
ExpectedCount: 2,
},
{ // audit threshold edge case (1)
Preferences: testNodeSelectionConfig(9, 0, false),
RequestCount: 1,
ExpectedCount: 1,
},
{ // excluded node ids being excluded
Preferences: testNodeSelectionConfig(5, 0, false),
ExcludeCount: 7,
RequestCount: 5,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
} {
t.Logf("#%2d. %+v", i, tt)
service := planet.Satellites[0].Overlay.Service
var excludedNodes []storj.NodeID
for _, storageNode := range planet.StorageNodes[:tt.ExcludeCount] {
excludedNodes = append(excludedNodes, storageNode.ID())
}
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: tt.RequestCount,
ExcludedIDs: excludedNodes,
}, &tt.Preferences)
t.Log(len(response), err)
if tt.ShouldFailWith != nil {
assert.Error(t, err)
assert.True(t, tt.ShouldFailWith.Has(err))
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.ExpectedCount, len(response))
}
}
// ipToLastNet converts target address to its IP and /24 subnet IPv4 or /64 subnet IPv6.
func ipToLastNet(target string) (network string, err error) {
host, _, err := net.SplitHostPort(target)
if err != nil {
return "", err
}
ip := net.ParseIP(host)
if ip == nil {
return "", errors.New("invalid ip " + host)
}
// If addr can be converted to 4byte notation, it is an IPv4 address, else its an IPv6 address
if ipv4 := ip.To4(); ipv4 != nil {
//Filter all IPv4 Addresses into /24 Subnet's
mask := net.CIDRMask(24, 32)
return ipv4.Mask(mask).String(), nil
}
if ipv6 := ip.To16(); ipv6 != nil {
//Filter all IPv6 Addresses into /64 Subnet's
mask := net.CIDRMask(64, 128)
return ipv6.Mask(mask).String(), nil
}
return "", errors.New("unable to get network for address " + ip.String())
}
func TestNodeSelectionGracefulExit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
@ -491,34 +426,34 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
for i, tt := range []test{
{ // reputable and new nodes, happy path
Preferences: testNodeSelectionConfig(5, 0.5, false),
Preferences: testNodeSelectionConfig(0.5, false),
RequestCount: 5,
ExpectedCount: 5,
},
{ // all reputable nodes, happy path
Preferences: testNodeSelectionConfig(0, 1, false),
Preferences: testNodeSelectionConfig(1, false),
RequestCount: 5,
ExpectedCount: 5,
},
{ // all new nodes, happy path
Preferences: testNodeSelectionConfig(50, 1, false),
Preferences: testNodeSelectionConfig(1, false),
RequestCount: 5,
ExpectedCount: 5,
},
{ // reputable and new nodes, requested too many
Preferences: testNodeSelectionConfig(5, 0.5, false),
Preferences: testNodeSelectionConfig(0.5, false),
RequestCount: 10,
ExpectedCount: 5,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all reputable nodes, requested too many
Preferences: testNodeSelectionConfig(0, 1, false),
Preferences: testNodeSelectionConfig(1, false),
RequestCount: 10,
ExpectedCount: 5,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all new nodes, requested too many
Preferences: testNodeSelectionConfig(50, 1, false),
Preferences: testNodeSelectionConfig(1, false),
RequestCount: 10,
ExpectedCount: 5,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
@ -689,7 +624,7 @@ func TestDistinctIPs(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// This sets a reputable audit count for nodes[8] and nodes[9].
// Vets nodes[8] and nodes[9].
for i := 9; i > 7; i-- {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: planet.StorageNodes[i].ID(),
@ -713,14 +648,14 @@ func TestDistinctIPsWithBatch(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 3,
UniqueIPCount: 3, // creates 3 additional unique ip addresses, totaling to 4 IPs
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.UpdateStatsBatchSize = 1
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// This sets a reputable audit count for nodes[8] and nodes[9].
// Vets nodes[8] and nodes[9].
for i := 9; i > 7; i-- {
// These are done individually b/c the previous stat data is important
_, err := satellite.Overlay.Service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
@ -742,24 +677,23 @@ func testDistinctIPs(t *testing.T, ctx *testcontext.Context, planet *testplanet.
service := satellite.Overlay.Service
tests := []struct {
duplicateCount int
requestCount int
preferences overlay.NodeSelectionConfig
shouldFailWith *errs.Class
}{
{ // test only distinct IPs with half new nodes
// expect 2 new and 2 vetted
requestCount: 4,
preferences: testNodeSelectionConfig(1, 0.5, true),
preferences: testNodeSelectionConfig(0.5, true),
},
{ // test not enough distinct IPs
requestCount: 7,
preferences: testNodeSelectionConfig(0, 0, true),
requestCount: 5, // expect 3 new, 2 old but fails because only 4 distinct IPs, not 5
preferences: testNodeSelectionConfig(0.6, true),
shouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // test distinct flag false allows duplicates
duplicateCount: 10,
requestCount: 5,
preferences: testNodeSelectionConfig(0, 0.5, false),
{ // test distinct flag false allows duplicate IP addresses
requestCount: 5, // expect 3 new, 2 old
preferences: testNodeSelectionConfig(0.6, false),
},
}

View File

@ -101,6 +101,11 @@ type DB interface {
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier
TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error)
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier
TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error)
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
@ -133,8 +138,6 @@ type FindStorageNodesRequest struct {
// NodeCriteria are the requirements for selecting nodes.
type NodeCriteria struct {
FreeDisk int64
AuditCount int64
UptimeCount int64
ExcludedIDs []storj.NodeID
ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
@ -370,8 +373,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
criteria := NodeCriteria{
FreeDisk: preferences.MinimumDiskSpace.Int64(),
AuditCount: preferences.AuditCount,
UptimeCount: preferences.UptimeCount,
ExcludedIDs: excludedIDs,
ExcludedNetworks: excludedNetworks,
MinimumVersion: preferences.MinimumVersion,
@ -542,3 +543,28 @@ func ResolveIPAndNetwork(ctx context.Context, target string) (ipPort, network st
return "", "", errors.New("unable to get network for address " + ipAddr.String())
}
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.
func (service *Service) TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error) {
vettedTime, err = service.db.TestVetNode(ctx, nodeID)
service.log.Warn("node vetted", zap.Stringer("node ID", nodeID), zap.Stringer("vetted time", vettedTime))
if err != nil {
service.log.Warn("error vetting node", zap.Stringer("node ID", nodeID))
return nil, err
}
err = service.SelectionCache.Refresh(ctx)
service.log.Warn("nodecache refresh err", zap.Error(err))
return vettedTime, err
}
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier.
func (service *Service) TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error) {
err = service.db.TestUnvetNode(ctx, nodeID)
if err != nil {
service.log.Warn("error unvetting node", zap.Stringer("node ID", nodeID), zap.Error(err))
return err
}
err = service.SelectionCache.Refresh(ctx)
service.log.Warn("nodecache refresh err", zap.Error(err))
return err
}

View File

@ -37,10 +37,10 @@ func TestCache_Database(t *testing.T) {
}
// returns a NodeSelectionConfig with sensible test values.
func testNodeSelectionConfig(auditCount int64, newNodeFraction float64, distinctIP bool) overlay.NodeSelectionConfig {
func testNodeSelectionConfig(newNodeFraction float64, distinctIP bool) overlay.NodeSelectionConfig {
return overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: auditCount,
AuditCount: 0,
NewNodeFraction: newNodeFraction,
OnlineWindow: time.Hour,
DistinctIP: distinctIP,
@ -71,7 +71,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
address := &pb.NodeAddress{Address: "127.0.0.1:0"}
lastNet := "127.0.0"
nodeSelectionConfig := testNodeSelectionConfig(0, 0, false)
nodeSelectionConfig := testNodeSelectionConfig(0, false)
serviceConfig := overlay.Config{Node: nodeSelectionConfig, UpdateStatsBatchSize: 100, AuditHistory: testAuditHistoryConfig()}
service := overlay.NewService(zaptest.NewLogger(t), store, serviceConfig)
@ -173,7 +173,9 @@ func TestRandomizedSelection(t *testing.T) {
cache := db.OverlayCache()
allIDs := make(storj.NodeIDList, totalNodes)
nodeCounts := make(map[storj.NodeID]int)
defaults := overlay.NodeSelectionConfig{}
config := overlay.NodeSelectionConfig{
AuditCount: 1,
}
// put nodes in cache
for i := 0; i < totalNodes; i++ {
@ -189,7 +191,7 @@ func TestRandomizedSelection(t *testing.T) {
Capacity: &pb.NodeCapacity{},
IsUp: true,
}
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), defaults)
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), config)
require.NoError(t, err)
if i%2 == 0 { // make half of nodes "new" and half "vetted"
@ -217,13 +219,11 @@ func TestRandomizedSelection(t *testing.T) {
if i%2 == 0 {
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, 0, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
} else {
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
}
@ -300,8 +300,7 @@ func TestRandomizedSelectionCache(t *testing.T) {
LastIPPort: address,
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeDisk: 200 * memory.MiB.Int64(),
FreeBandwidth: 1 * memory.TB.Int64(),
FreeDisk: 200 * memory.MiB.Int64(),
},
Version: &pb.NodeVersion{
Version: "v1.1.0",
@ -754,7 +753,7 @@ func TestSuspendedSelection(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
suspendedIDs := make(map[storj.NodeID]bool)
defaults := overlay.NodeSelectionConfig{}
config := overlay.NodeSelectionConfig{AuditCount: 1}
// put nodes in cache
for i := 0; i < totalNodes; i++ {
@ -770,7 +769,7 @@ func TestSuspendedSelection(t *testing.T) {
Capacity: &pb.NodeCapacity{},
IsUp: true,
}
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), defaults)
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), config)
require.NoError(t, err)
if i%2 == 0 { // make half of nodes "new" and half "vetted"
@ -802,7 +801,6 @@ func TestSuspendedSelection(t *testing.T) {
// select 10 vetted nodes - 5 vetted, 2 suspended, so expect 3
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, 0, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
require.Len(t, nodes, 3)
@ -813,7 +811,6 @@ func TestSuspendedSelection(t *testing.T) {
// select 10 new nodes - 5 new, 2 suspended, so expect 3
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
require.Len(t, nodes, 3)
@ -890,3 +887,34 @@ func getNodeInfo(nodeID storj.NodeID) overlay.NodeCheckInInfo {
},
}
}
func TestVetAndUnvetNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].Overlay.Service
node := planet.StorageNodes[0]
// clear existing data
err := service.TestUnvetNode(ctx, node.ID())
require.NoError(t, err)
dossier, err := service.Get(ctx, node.ID())
require.NoError(t, err)
require.Nil(t, dossier.Reputation.VettedAt)
// vet again
vettedTime, err := service.TestVetNode(ctx, node.ID())
require.NoError(t, err)
require.NotNil(t, vettedTime)
dossier, err = service.Get(ctx, node.ID())
require.NoError(t, err)
require.NotNil(t, dossier.Reputation.VettedAt)
// unvet again
err = service.TestUnvetNode(ctx, node.ID())
require.NoError(t, err)
dossier, err = service.Get(ctx, node.ID())
require.NoError(t, err)
require.Nil(t, dossier.Reputation.VettedAt)
})
}

View File

@ -179,17 +179,15 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria,
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
conds.add(`free_disk >= ?`, criteria.FreeDisk)
conds.add(`last_contact_success > ?`, time.Now().Add(-criteria.OnlineWindow))
conds.add(`last_contact_success > ?`, time.Now().UTC().Add(-criteria.OnlineWindow))
if isNewNodeQuery {
conds.add(
`(total_audit_count < ? OR total_uptime_count < ?)`,
criteria.AuditCount, criteria.UptimeCount,
`vetted_at IS NULL`,
)
} else {
conds.add(
`total_audit_count >= ? AND total_uptime_count >= ?`,
criteria.AuditCount, criteria.UptimeCount,
`vetted_at is NOT NULL`,
)
}

View File

@ -40,23 +40,21 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
defer mon.Task()(&ctx)(&err)
query := `
SELECT id, address, last_net, last_ip_port, (total_audit_count < $1 OR total_uptime_count < $2) as isnew
SELECT id, address, last_net, last_ip_port, vetted_at
FROM nodes
WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND exit_initiated_at IS NULL
AND type = $3
AND free_disk >= $4
AND last_contact_success > $5
AND type = $1
AND free_disk >= $2
AND last_contact_success > $3
`
args := []interface{}{
// $1, $2
selectionCfg.AuditCount, selectionCfg.UptimeCount,
// $3
// $1
int(pb.NodeType_STORAGE),
// $4
// $2
selectionCfg.MinimumDiskSpace.Int64(),
// $5
// $3
time.Now().Add(-selectionCfg.OnlineWindow),
}
if selectionCfg.MinimumVersion != "" {
@ -64,9 +62,9 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
if err != nil {
return nil, nil, err
}
query += `AND (major > $6 OR (major = $7 AND (minor > $8 OR (minor = $9 AND patch >= $10)))) AND release`
query += `AND (major > $4 OR (major = $5 AND (minor > $6 OR (minor = $7 AND patch >= $8)))) AND release`
args = append(args,
// $6 - $10
// $4 - $8
version.Major, version.Major, version.Minor, version.Minor, version.Patch,
)
}
@ -83,8 +81,8 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
var isnew bool
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &isnew)
var vettedAt *time.Time
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt)
if err != nil {
return nil, nil, err
}
@ -92,7 +90,7 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
node.LastIPPort = lastIPPort.String
}
if isnew {
if vettedAt == nil {
newNodes = append(newNodes, &node)
continue
}
@ -1198,7 +1196,6 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *
// if a node fails enough audits, it gets disqualified
// if a node gets enough "unknown" audits, it gets put into suspension
// if a node gets enough successful audits, and is in suspension, it gets removed from suspension
auditAlpha := dbNode.AuditReputationAlpha
auditBeta := dbNode.AuditReputationBeta
unknownAuditAlpha := dbNode.UnknownAuditReputationAlpha
@ -1528,3 +1525,30 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
return nil
}
var (
// ErrVetting is the error class for the following test methods.
ErrVetting = errs.Class("vetting error")
)
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.
func (cache *overlaycache) TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error) {
updateFields := dbx.Node_Update_Fields{
VettedAt: dbx.Node_VettedAt(time.Now().UTC()),
}
node, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return nil, err
}
return node.VettedAt, nil
}
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier.
func (cache *overlaycache) TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error) {
_, err = cache.db.Exec(ctx, `UPDATE nodes SET vetted_at = NULL WHERE nodes.id = $1;`, nodeID)
if err != nil {
return err
}
_, err = cache.db.OverlayCache().Get(ctx, nodeID)
return err
}

View File

@ -23,6 +23,7 @@ func TestUpdateStats(t *testing.T) {
nodeB := planet.StorageNodes[1]
nodeA.Contact.Chore.Pause(ctx)
nodeB.Contact.Chore.Pause(ctx)
cache := planet.Satellites[0].DB.OverlayCache()
numAudits := int64(2)
numUptimes := int64(3)
@ -103,6 +104,7 @@ func TestBatchUpdateStats(t *testing.T) {
nodeB := planet.StorageNodes[1]
nodeA.Contact.Chore.Pause(ctx)
nodeB.Contact.Chore.Pause(ctx)
cache := planet.Satellites[0].DB.OverlayCache()
numAudits := int64(2)
numUptimes := int64(3)

View File

@ -719,4 +719,3 @@ func Schema() map[string]*dbschema.Schema {
"used_serial": &dbschema.Schema{},
}
}