satellite/overlay: support DistinctIP=false in selection cache

Most of our tests and storj-sim are using DistinctIP.

Also fix bug with newNodeCount calculation.

Change-Id: I1a6d0efe7074908896a3322d19f487b929f0f0fc
This commit is contained in:
Egon Elbre 2020-05-06 20:22:54 +03:00 committed by Stefan Benten
parent d8bdc60e19
commit ce6a500b0c
2 changed files with 127 additions and 23 deletions

View File

@ -111,13 +111,16 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo
}
}
return cacheData.GetNodes(ctx, req, cache.selectionConfig.NewNodeFraction)
return cacheData.GetNodes(ctx, req, cache.selectionConfig.NewNodeFraction, cache.selectionConfig.DistinctIP)
}
// GetNodes selects nodes from the cache that will be used to upload a file.
//
// If there are new nodes in the cache, we will return a small fraction of those
// and then return mostly reputable nodes
func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesRequest, newNodeFraction float64) (_ []*SelectedNode, err error) {
// and then return mostly reputable nodes.
//
// Distinct determines whether the nodes have to be from distinct networks.
func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesRequest, newNodeFraction float64, distinct bool) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
cacheData.mu.RLock()
@ -128,7 +131,7 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques
if totalcount <= 0 {
totalcount = req.RequestedCount
}
newNodeCount := int(float64(req.RequestedCount) * newNodeFraction)
newNodeCount := int(float64(totalcount) * newNodeFraction)
var selectedNodeResults = []*SelectedNode{}
var distinctNetworks = map[string]struct{}{}
@ -144,12 +147,16 @@ nextNewNode:
continue nextNewNode
}
}
if distinct {
// don't select a node if we've already selected another node from the same network
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue nextNewNode
}
distinctNetworks[currNode.LastNet] = struct{}{}
}
selectedNodeResults = append(selectedNodeResults, currNode.Clone())
distinctNetworks[currNode.LastNet] = struct{}{}
if len(selectedNodeResults) >= newNodeCount {
break
}
@ -167,13 +174,15 @@ nextReputableNode:
}
}
if distinct {
// don't select a node if we've already selected another node from the same network
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue nextReputableNode
}
distinctNetworks[currNode.LastNet] = struct{}{}
}
selectedNodeResults = append(selectedNodeResults, currNode.Clone())
distinctNetworks[currNode.LastNet] = struct{}{}
if len(selectedNodeResults) >= totalcount {
break
}

View File

@ -19,12 +19,13 @@ import (
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
var nodeCfg = overlay.NodeSelectionConfig{
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: 0.2,
@ -51,7 +52,7 @@ func TestRefresh(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeCfg,
nodeSelectionConfig,
)
// the cache should have no nodes to start
err := cache.Refresh(ctx)
@ -98,7 +99,7 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
Release: true,
},
}
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeCfg)
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeSelectionConfig)
require.NoError(t, err)
// make half of the nodes reputable
@ -151,7 +152,7 @@ func TestRefreshConcurrent(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeCfg,
nodeSelectionConfig,
)
var group errgroup.Group
@ -172,7 +173,7 @@ func TestRefreshConcurrent(t *testing.T) {
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
lowStaleness,
nodeCfg,
nodeSelectionConfig,
)
group.Go(func() error {
return cache.Refresh(ctx)
@ -188,7 +189,7 @@ func TestRefreshConcurrent(t *testing.T) {
func TestGetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeCfg = overlay.NodeSelectionConfig{
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 0,
UptimeCount: 0,
NewNodeFraction: 0.2,
@ -200,7 +201,7 @@ func TestGetNodes(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeCfg,
nodeSelectionConfig,
)
// the cache should have no nodes to start
reputable, new := cache.Size()
@ -254,7 +255,7 @@ func TestGetNodesConcurrent(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeCfg,
nodeSelectionConfig,
)
var group errgroup.Group
@ -294,7 +295,7 @@ func TestGetNodesConcurrent(t *testing.T) {
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
lowStaleness,
nodeCfg,
nodeSelectionConfig,
)
group.Go(func() error {
@ -325,6 +326,100 @@ func TestGetNodesConcurrent(t *testing.T) {
require.Equal(t, 2, mockDB.callCount)
}
func TestGetNodesDistinct(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.9"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.9:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.6"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.6:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.1.7"},
LastNet: "127.0.1",
LastIPPort: "127.0.1.7:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.2.7"},
LastNet: "127.0.2",
LastIPPort: "127.0.2.7:8000",
}}
newNodes := []*overlay.SelectedNode{{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.0.10"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.10:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.1.8"},
LastNet: "127.0.1",
LastIPPort: "127.0.1.8:8000",
}, {
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: "127.0.2.8"},
LastNet: "127.0.2",
LastIPPort: "127.0.2.8:8000",
}}
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
}
{
// test that distinct ip doesn't return same last net
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = true
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
config,
)
// selecting 3 should be possible
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 3,
})
require.NoError(t, err)
seen := make(map[string]bool)
for _, n := range nodes {
require.False(t, seen[n.LastNet])
seen[n.LastNet] = true
}
// selecting 6 is impossible
_, err = cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 6,
})
require.Error(t, err)
}
{ // test that distinctIP=true allows selecting 6 nodes
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = false
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
config,
)
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 6,
})
require.NoError(t, err)
}
}
func TestGetNodesError(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -333,7 +428,7 @@ func TestGetNodesError(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
nodeCfg,
nodeSelectionConfig,
)
// there should be 0 nodes in the cache
@ -350,7 +445,7 @@ func TestGetNodesError(t *testing.T) {
func TestNewNodeFraction(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
newNodeFraction := 0.2
var nodeCfg = overlay.NodeSelectionConfig{
var nodeSelectionConfig = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: newNodeFraction,
@ -362,7 +457,7 @@ func TestNewNodeFraction(t *testing.T) {
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeCfg,
nodeSelectionConfig,
)
// the cache should have no nodes to start
err := cache.Refresh(ctx)