Jennifer Johnson 4e2413a99d satellite/satellitedb: uses vetted_at field to select for reputable nodes
Additionally, this PR changes NewNodeFraction devDefault and testplanet config from 0.05 to 1.
This is because many tests relied on selecting nodes that were reputable based on audit and uptime
counts of 0, in effect, selecting new nodes as reputable ones.
However, since reputation is now indicated by a vetted_at db field that is explicitly set
rather than implied by audit and uptime counts, it would be more complicated to try to
update all of the nodes' reputations before selecting nodes for tests.
Now we just allow all test nodes to be new if needed.

Change-Id: Ib9531be77408662315b948fd029cee925ed2ca1d
2020-09-04 16:45:32 +00:00

494 lines
13 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay_test
import (
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: 0.2,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 100 * memory.MiB,
const (
// staleness is how stale the cache can be before we sync with
// the database to refresh the cache.
// using a negative time will force the cache to refresh every time.
lowStaleness = -time.Hour
// using a positive time will make it so that the cache is only refreshed when
// it hasn't been in the past hour.
highStaleness = time.Hour
func TestRefresh(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
// the cache should have no nodes to start
err := cache.Refresh(ctx)
require.NoError(t, err)
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database
const nodeCount = 2
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 0)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 2, new)
require.Equal(t, 0, reputable)
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count, makeReputable int) (reputableIds []storj.NodeID) {
for i := 0; i < count; i++ {
subnet := strconv.Itoa(i) + ".1.2"
addr := subnet + ".3:8080"
n := overlay.NodeCheckInInfo{
NodeID: storj.NodeID{byte(i)},
Address: &pb.NodeAddress{
Address: addr,
Transport: pb.NodeTransport_TCP_TLS_GRPC,
LastNet: subnet,
LastIPPort: addr,
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeDisk: 200 * memory.MiB.Int64(),
Version: &pb.NodeVersion{
Version: "v1.1.0",
CommitHash: "",
Timestamp: time.Time{},
Release: true,
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeSelectionConfig)
require.NoError(t, err)
// make designated nodes reputable
if i < makeReputable {
stats, err := db.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: storj.NodeID{byte(i)},
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
AuditHistory: testAuditHistoryConfig(),
}, time.Now())
require.NoError(t, err)
require.NotNil(t, stats.VettedAt)
reputableIds = append(reputableIds, storj.NodeID{byte(i)})
return reputableIds
type mockdb struct {
mu sync.Mutex
callCount int
reputable []*overlay.SelectedNode
new []*overlay.SelectedNode
func (m *mockdb) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
sync2.Sleep(ctx, 500*time.Millisecond)
reputable = make([]*overlay.SelectedNode, len(m.reputable))
for i, n := range m.reputable {
reputable[i] = n.Clone()
new = make([]*overlay.SelectedNode, len(
for i, n := range {
new[i] = n.Clone()
return reputable, new, nil
func TestRefreshConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// concurrent cache.Refresh with high staleness, where high staleness means the
// cache should only be refreshed the first time we call cache.Refresh
mockDB := mockdb{}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
var group errgroup.Group
group.Go(func() error {
return cache.Refresh(ctx)
group.Go(func() error {
return cache.Refresh(ctx)
err := group.Wait()
require.NoError(t, err)
require.Equal(t, 1, mockDB.callCount)
// concurrent cache.Refresh with low staleness, where low staleness
// means that the cache will refresh *every time* cache.Refresh is called
mockDB = mockdb{}
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
group.Go(func() error {
return cache.Refresh(ctx)
group.Go(func() error {
return cache.Refresh(ctx)
err = group.Wait()
require.NoError(t, err)
require.Equal(t, 2, mockDB.callCount)
func TestGetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeSelectionConfig = overlay.NodeSelectionConfig{
NewNodeFraction: 0.2,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 100 * memory.MiB,
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
// the cache should have no nodes to start
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add 4 nodes to the database and vet 2
const nodeCount = 4
nodeIds := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 2)
require.Len(t, nodeIds, 2)
// confirm cache.GetNodes returns the correct nodes
selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 2, new)
require.Equal(t, 2, reputable)
require.Equal(t, 2, len(selectedNodes))
for _, node := range selectedNodes {
require.NotEqual(t, node.ID, "")
require.NotEqual(t, node.Address.Address, "")
require.NotEqual(t, node.LastIPPort, "")
require.NotEqual(t, node.LastNet, "")
require.NotEqual(t, node.LastNet, "")
func TestGetNodesConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.0",
LastIPPort: "",
newNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.0",
LastIPPort: "",
// concurrent GetNodes with high staleness, where high staleness means the
// cache should only be refreshed the first time we call cache.GetNodes
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
var group errgroup.Group
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = ""
nodes[0] = nil
return err
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = ""
nodes[0] = nil
return err
err := group.Wait()
require.NoError(t, err)
// expect only one call to the db via cache.GetNodes
require.Equal(t, 1, mockDB.callCount)
// concurrent get nodes with low staleness, where low staleness means that
// the cache will refresh each time cache.GetNodes is called
mockDB = mockdb{
reputable: reputableNodes,
new: newNodes,
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = ""
nodes[0] = nil
return err
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = ""
nodes[0] = nil
return err
err = group.Wait()
require.NoError(t, err)
// expect two calls to the db via cache.GetNodes
require.Equal(t, 2, mockDB.callCount)
func TestGetNodesDistinct(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.0",
LastIPPort: "",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.0",
LastIPPort: "",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.1",
LastIPPort: "",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.2",
LastIPPort: "",
newNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.0",
LastIPPort: "",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.1",
LastIPPort: "",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: ""},
LastNet: "127.0.2",
LastIPPort: "",
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
// test that distinct ip doesn't return same last net
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = true
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
// selecting 3 should be possible
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 3,
require.NoError(t, err)
seen := make(map[string]bool)
for _, n := range nodes {
require.False(t, seen[n.LastNet])
seen[n.LastNet] = true
// selecting 6 is impossible
_, err = cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 6,
require.Error(t, err)
{ // test that distinctIP=true allows selecting 6 nodes
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = false
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 6,
require.NoError(t, err)
func TestGetNodesError(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
mockDB := mockdb{}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
// there should be 0 nodes in the cache
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// since the cache has no nodes, we should not be able
// to get 2 storage nodes from it and we expect an error
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.Error(t, err)
func TestNewNodeFraction(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
newNodeFraction := 0.2
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: newNodeFraction,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 10 * memory.MiB,
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
// the cache should have no nodes to start
err := cache.Refresh(ctx)
require.NoError(t, err)
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database, some are reputable and some are new nodes
const nodeCount = 10
repIDs := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 4)
require.Len(t, repIDs, 4)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 6, new)
require.Equal(t, 4, reputable)
// select nodes and confirm correct new node fraction
n, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 5})
require.NoError(t, err)
require.Equal(t, len(n), 5)
var reputableCount int
for _, id := range repIDs {
for _, node := range n {
if id == node.ID {
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction)) // 1, 1