storj/satellite/overlay/nodeselectioncache_test.go
Cameron Ayer dc67ce74c9 satellite: remove IsUp field from overlay.UpdateRequest
With the new overlay.AuditOutcome type for offline audits, the
IsUp field is redundant. If AuditOutcome != AuditOffline, then
the node is online.

In addition to removing the field itself, other changes needed
to be made regarding the relationship between 'uptime' and 'audits'.
Previously, uptime and audit outcome were completely separated. For
example, it was possible to update a node's stats to give it a
successful/failed/unknown audit while simultaneously indicating that
the node was offline by setting IsUp to false. This is no longer possible
under this changeset. Some test which did this have been changed slightly
in order to pass.

Also add new benchmarks for UpdateStats and BatchUpdateStats with different
audit outcomes.

Change-Id: I998892d615850b1f138dc62f9b050f720ea0926b
2020-11-02 15:34:17 -05:00

493 lines
13 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay_test
import (
"context"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: 0.2,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 100 * memory.MiB,
}
const (
// staleness is how stale the cache can be before we sync with
// the database to refresh the cache.
// using a negative time will force the cache to refresh every time.
lowStaleness = -time.Hour
// using a positive time will make it so that the cache is only refreshed when
// it hasn't been in the past hour.
highStaleness = time.Hour
)
func TestRefresh(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
)
// the cache should have no nodes to start
err := cache.Refresh(ctx)
require.NoError(t, err)
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database
const nodeCount = 2
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 0)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 2, new)
require.Equal(t, 0, reputable)
})
}
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count, makeReputable int) (reputableIds []storj.NodeID) {
for i := 0; i < count; i++ {
subnet := strconv.Itoa(i) + ".1.2"
addr := subnet + ".3:8080"
n := overlay.NodeCheckInInfo{
NodeID: storj.NodeID{byte(i)},
Address: &pb.NodeAddress{
Address: addr,
Transport: pb.NodeTransport_TCP_TLS_GRPC,
},
LastNet: subnet,
LastIPPort: addr,
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeDisk: 200 * memory.MiB.Int64(),
},
Version: &pb.NodeVersion{
Version: "v1.1.0",
CommitHash: "",
Timestamp: time.Time{},
Release: true,
},
}
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeSelectionConfig)
require.NoError(t, err)
// make designated nodes reputable
if i < makeReputable {
stats, err := db.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: storj.NodeID{byte(i)},
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
AuditHistory: testAuditHistoryConfig(),
}, time.Now())
require.NoError(t, err)
require.NotNil(t, stats.VettedAt)
reputableIds = append(reputableIds, storj.NodeID{byte(i)})
}
}
return reputableIds
}
type mockdb struct {
mu sync.Mutex
callCount int
reputable []*overlay.SelectedNode
new []*overlay.SelectedNode
}
func (m *mockdb) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
m.mu.Lock()
defer m.mu.Unlock()
sync2.Sleep(ctx, 500*time.Millisecond)
m.callCount++
reputable = make([]*overlay.SelectedNode, len(m.reputable))
for i, n := range m.reputable {
reputable[i] = n.Clone()
}
new = make([]*overlay.SelectedNode, len(m.new))
for i, n := range m.new {
new[i] = n.Clone()
}
return reputable, new, nil
}
func TestRefreshConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// concurrent cache.Refresh with high staleness, where high staleness means the
// cache should only be refreshed the first time we call cache.Refresh
mockDB := mockdb{}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeSelectionConfig,
)
var group errgroup.Group
group.Go(func() error {
return cache.Refresh(ctx)
})
group.Go(func() error {
return cache.Refresh(ctx)
})
err := group.Wait()
require.NoError(t, err)
require.Equal(t, 1, mockDB.callCount)
// concurrent cache.Refresh with low staleness, where low staleness
// means that the cache will refresh *every time* cache.Refresh is called
mockDB = mockdb{}
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
lowStaleness,
nodeSelectionConfig,
)
group.Go(func() error {
return cache.Refresh(ctx)
})
group.Go(func() error {
return cache.Refresh(ctx)
})
err = group.Wait()
require.NoError(t, err)
require.Equal(t, 2, mockDB.callCount)
}
func TestGetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeSelectionConfig = overlay.NodeSelectionConfig{
NewNodeFraction: 0.2,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 100 * memory.MiB,
}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
)
// the cache should have no nodes to start
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add 4 nodes to the database and vet 2
const nodeCount = 4
nodeIds := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 2)
require.Len(t, nodeIds, 2)
// confirm cache.GetNodes returns the correct nodes
selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 2, new)
require.Equal(t, 2, reputable)
require.Equal(t, 2, len(selectedNodes))
for _, node := range selectedNodes {
require.NotEqual(t, node.ID, "")
require.NotEqual(t, node.Address.Address, "")
require.NotEqual(t, node.LastIPPort, "")
require.NotEqual(t, node.LastNet, "")
require.NotEqual(t, node.LastNet, "")
}
})
}
func TestGetNodesConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: "127.0.0.9"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.9:8000",
}}
newNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: "127.0.0.10"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.10:8000",
}}
// concurrent GetNodes with high staleness, where high staleness means the
// cache should only be refreshed the first time we call cache.GetNodes
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeSelectionConfig,
)
var group errgroup.Group
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
err := group.Wait()
require.NoError(t, err)
// expect only one call to the db via cache.GetNodes
require.Equal(t, 1, mockDB.callCount)
// concurrent get nodes with low staleness, where low staleness means that
// the cache will refresh each time cache.GetNodes is called
mockDB = mockdb{
reputable: reputableNodes,
new: newNodes,
}
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
lowStaleness,
nodeSelectionConfig,
)
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
group.Go(func() error {
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
err = group.Wait()
require.NoError(t, err)
// expect two calls to the db via cache.GetNodes
require.Equal(t, 2, mockDB.callCount)
}
func TestGetNodesDistinct(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.9"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.9:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.6"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.6:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.1.7"},
LastNet: "127.0.1",
LastIPPort: "127.0.1.7:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.2.7"},
LastNet: "127.0.2",
LastIPPort: "127.0.2.7:8000",
}}
newNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.10"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.10:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.1.8"},
LastNet: "127.0.1",
LastIPPort: "127.0.1.8:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.2.8"},
LastNet: "127.0.2",
LastIPPort: "127.0.2.8:8000",
}}
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
}
{
// test that distinct ip doesn't return same last net
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = true
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
config,
)
// selecting 3 should be possible
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 3,
})
require.NoError(t, err)
seen := make(map[string]bool)
for _, n := range nodes {
require.False(t, seen[n.LastNet])
seen[n.LastNet] = true
}
// selecting 6 is impossible
_, err = cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 6,
})
require.Error(t, err)
}
{ // test that distinctIP=true allows selecting 6 nodes
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = false
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
config,
)
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: 6,
})
require.NoError(t, err)
}
}
func TestGetNodesError(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
mockDB := mockdb{}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeSelectionConfig,
)
// there should be 0 nodes in the cache
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// since the cache has no nodes, we should not be able
// to get 2 storage nodes from it and we expect an error
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.Error(t, err)
}
func TestNewNodeFraction(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
newNodeFraction := 0.2
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: newNodeFraction,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 10 * memory.MiB,
}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
)
// the cache should have no nodes to start
err := cache.Refresh(ctx)
require.NoError(t, err)
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database, some are reputable and some are new nodes
const nodeCount = 10
repIDs := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, 4)
require.Len(t, repIDs, 4)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 6, new)
require.Equal(t, 4, reputable)
// select nodes and confirm correct new node fraction
n, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 5})
require.NoError(t, err)
require.Equal(t, len(n), 5)
var reputableCount int
for _, id := range repIDs {
for _, node := range n {
if id == node.ID {
reputableCount++
}
}
}
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction)) // 1, 1
})
}