diff --git a/satellite/overlay/nodeselectioncache.go b/satellite/overlay/nodeselectioncache.go index 5a75c2487..492162a1d 100644 --- a/satellite/overlay/nodeselectioncache.go +++ b/satellite/overlay/nodeselectioncache.go @@ -111,13 +111,16 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo } } - return cacheData.GetNodes(ctx, req, cache.selectionConfig.NewNodeFraction) + return cacheData.GetNodes(ctx, req, cache.selectionConfig.NewNodeFraction, cache.selectionConfig.DistinctIP) } // GetNodes selects nodes from the cache that will be used to upload a file. +// // If there are new nodes in the cache, we will return a small fraction of those -// and then return mostly reputable nodes -func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesRequest, newNodeFraction float64) (_ []*SelectedNode, err error) { +// and then return mostly reputable nodes. +// +// Distinct determines whether the nodes have to be from distinct networks. +func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesRequest, newNodeFraction float64, distinct bool) (_ []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) cacheData.mu.RLock() @@ -128,7 +131,7 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques if totalcount <= 0 { totalcount = req.RequestedCount } - newNodeCount := int(float64(req.RequestedCount) * newNodeFraction) + newNodeCount := int(float64(totalcount) * newNodeFraction) var selectedNodeResults = []*SelectedNode{} var distinctNetworks = map[string]struct{}{} @@ -144,12 +147,16 @@ nextNewNode: continue nextNewNode } } - if _, ok := distinctNetworks[currNode.LastNet]; ok { - continue nextNewNode + + if distinct { + // don't select a node if we've already selected another node from the same network + if _, ok := distinctNetworks[currNode.LastNet]; ok { + continue nextNewNode + } + distinctNetworks[currNode.LastNet] = struct{}{} } selectedNodeResults = append(selectedNodeResults, currNode.Clone()) - distinctNetworks[currNode.LastNet] = struct{}{} if len(selectedNodeResults) >= newNodeCount { break } @@ -167,13 +174,15 @@ nextReputableNode: } } - // don't select a node if we've already selected another node from the same network - if _, ok := distinctNetworks[currNode.LastNet]; ok { - continue nextReputableNode + if distinct { + // don't select a node if we've already selected another node from the same network + if _, ok := distinctNetworks[currNode.LastNet]; ok { + continue nextReputableNode + } + distinctNetworks[currNode.LastNet] = struct{}{} } selectedNodeResults = append(selectedNodeResults, currNode.Clone()) - distinctNetworks[currNode.LastNet] = struct{}{} if len(selectedNodeResults) >= totalcount { break } diff --git a/satellite/overlay/nodeselectioncache_test.go b/satellite/overlay/nodeselectioncache_test.go index 7e4cdf123..ff2ae0708 100644 --- a/satellite/overlay/nodeselectioncache_test.go +++ b/satellite/overlay/nodeselectioncache_test.go @@ -19,12 +19,13 @@ import ( "storj.io/common/storj" "storj.io/common/sync2" "storj.io/common/testcontext" + "storj.io/common/testrand" "storj.io/storj/satellite" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/satellitedb/satellitedbtest" ) -var nodeCfg = overlay.NodeSelectionConfig{ +var nodeSelectionConfig = overlay.NodeSelectionConfig{ AuditCount: 1, UptimeCount: 1, NewNodeFraction: 0.2, @@ -51,7 +52,7 @@ func TestRefresh(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), db.OverlayCache(), lowStaleness, - nodeCfg, + nodeSelectionConfig, ) // the cache should have no nodes to start err := cache.Refresh(ctx) @@ -98,7 +99,7 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun Release: true, }, } - err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeCfg) + err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeSelectionConfig) require.NoError(t, err) // make half of the nodes reputable @@ -151,7 +152,7 @@ func TestRefreshConcurrent(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), &mockDB, highStaleness, - nodeCfg, + nodeSelectionConfig, ) var group errgroup.Group @@ -172,7 +173,7 @@ func TestRefreshConcurrent(t *testing.T) { cache = overlay.NewNodeSelectionCache(zap.NewNop(), &mockDB, lowStaleness, - nodeCfg, + nodeSelectionConfig, ) group.Go(func() error { return cache.Refresh(ctx) @@ -188,7 +189,7 @@ func TestRefreshConcurrent(t *testing.T) { func TestGetNodes(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - var nodeCfg = overlay.NodeSelectionConfig{ + var nodeSelectionConfig = overlay.NodeSelectionConfig{ AuditCount: 0, UptimeCount: 0, NewNodeFraction: 0.2, @@ -200,7 +201,7 @@ func TestGetNodes(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), db.OverlayCache(), lowStaleness, - nodeCfg, + nodeSelectionConfig, ) // the cache should have no nodes to start reputable, new := cache.Size() @@ -254,7 +255,7 @@ func TestGetNodesConcurrent(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), &mockDB, highStaleness, - nodeCfg, + nodeSelectionConfig, ) var group errgroup.Group @@ -294,7 +295,7 @@ func TestGetNodesConcurrent(t *testing.T) { cache = overlay.NewNodeSelectionCache(zap.NewNop(), &mockDB, lowStaleness, - nodeCfg, + nodeSelectionConfig, ) group.Go(func() error { @@ -325,6 +326,100 @@ func TestGetNodesConcurrent(t *testing.T) { require.Equal(t, 2, mockDB.callCount) } +func TestGetNodesDistinct(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + reputableNodes := []*overlay.SelectedNode{{ + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.0.9"}, + LastNet: "127.0.0", + LastIPPort: "127.0.0.9:8000", + }, { + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.0.6"}, + LastNet: "127.0.0", + LastIPPort: "127.0.0.6:8000", + }, { + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.1.7"}, + LastNet: "127.0.1", + LastIPPort: "127.0.1.7:8000", + }, { + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.2.7"}, + LastNet: "127.0.2", + LastIPPort: "127.0.2.7:8000", + }} + + newNodes := []*overlay.SelectedNode{{ + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.0.10"}, + LastNet: "127.0.0", + LastIPPort: "127.0.0.10:8000", + }, { + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.1.8"}, + LastNet: "127.0.1", + LastIPPort: "127.0.1.8:8000", + }, { + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: "127.0.2.8"}, + LastNet: "127.0.2", + LastIPPort: "127.0.2.8:8000", + }} + + mockDB := mockdb{ + reputable: reputableNodes, + new: newNodes, + } + + { + // test that distinct ip doesn't return same last net + config := nodeSelectionConfig + config.NewNodeFraction = 0.5 + config.DistinctIP = true + cache := overlay.NewNodeSelectionCache(zap.NewNop(), + &mockDB, + highStaleness, + config, + ) + + // selecting 3 should be possible + nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + MinimumRequiredNodes: 3, + }) + require.NoError(t, err) + seen := make(map[string]bool) + for _, n := range nodes { + require.False(t, seen[n.LastNet]) + seen[n.LastNet] = true + } + + // selecting 6 is impossible + _, err = cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + MinimumRequiredNodes: 6, + }) + require.Error(t, err) + } + + { // test that distinctIP=true allows selecting 6 nodes + config := nodeSelectionConfig + config.NewNodeFraction = 0.5 + config.DistinctIP = false + cache := overlay.NewNodeSelectionCache(zap.NewNop(), + &mockDB, + highStaleness, + config, + ) + + _, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + MinimumRequiredNodes: 6, + }) + require.NoError(t, err) + } +} + func TestGetNodesError(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() @@ -333,7 +428,7 @@ func TestGetNodesError(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), &mockDB, highStaleness, - nodeCfg, + nodeSelectionConfig, ) // there should be 0 nodes in the cache @@ -350,7 +445,7 @@ func TestGetNodesError(t *testing.T) { func TestNewNodeFraction(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { newNodeFraction := 0.2 - var nodeCfg = overlay.NodeSelectionConfig{ + var nodeSelectionConfig = overlay.NodeSelectionConfig{ AuditCount: 1, UptimeCount: 1, NewNodeFraction: newNodeFraction, @@ -362,7 +457,7 @@ func TestNewNodeFraction(t *testing.T) { cache := overlay.NewNodeSelectionCache(zap.NewNop(), db.OverlayCache(), lowStaleness, - nodeCfg, + nodeSelectionConfig, ) // the cache should have no nodes to start err := cache.Refresh(ctx)