satellite/overlay: remove/deprecate NodeSelectionCache.Disabled

Once uppon a time, at the dawn of the implementation of Storj, when all the nodes are read from the database directly, every time.

After a while --  due to performance reasons -- it has been changed for upload and download: where all the nodes are read for a short period of time, and used from memory.

This is the version which was improved recently to support advanced node selections using placement.

But stil we have an old configuration value `service.config.NodeSelectionCache.Disabled`, and the db based implementation: `service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)`.

For safety, we need to remove this option, to make sure that we use the cache, which has the advanced features.

This patch was supposed to be a very small one (just removing a method and a config: https://review.dev.storj.io/c/storj/storj/+/11074/1/satellite/overlay/service.go), but it turned out that we need to update a lot of unit tests.

These unit tests used the old implementation (which is not used in production any more).

The tests which used both implementation are just updated to use only the new one
The tests which used only the old implementation are refactored (but keeping the test cases).
Using real unit tests (without DB, working on OSX, fast)

Closes https://github.com/storj/storj/issues/6217

Change-Id: I023f92c7e34235665cf8474513e67b2fcc4763eb
This commit is contained in:
Márton Elek 2023-08-28 10:56:45 +02:00 committed by Elek, Márton
parent 00194f54a2
commit ca0ea50cba
5 changed files with 466 additions and 515 deletions

View File

@ -369,32 +369,6 @@ func BenchmarkNodeSelection(b *testing.B) {
defer func() { require.NoError(b, background.Wait()) }()
defer func() { serviceCancel(); _ = service.Close() }()
b.Run("FindStorageNodesWithPreference", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: SelectCount,
ExcludedIDs: nil,
MinimumVersion: "v1.0.0",
AsOfSystemInterval: -time.Microsecond,
}, &nodeSelectionConfig)
require.NoError(b, err)
require.NotEmpty(b, selected)
}
})
b.Run("FindStorageNodesWithPreferenceExclusion", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: SelectCount,
ExcludedIDs: excludedIDs,
MinimumVersion: "v1.0.0",
AsOfSystemInterval: -time.Microsecond,
}, &nodeSelectionConfig)
require.NoError(b, err)
require.NotEmpty(b, selected)
}
})
b.Run("FindStorageNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{

View File

@ -4,9 +4,11 @@
package overlay_test
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math/rand"
"net"
"runtime"
"strings"
@ -17,8 +19,10 @@ import (
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/exp/slices"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc/rpcpeer"
@ -28,7 +32,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/reputation"
)
func TestMinimumDiskSpace(t *testing.T) {
@ -47,7 +50,6 @@ func TestMinimumDiskSpace(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
saOverlay := planet.Satellites[0].Overlay
nodeConfig := planet.Satellites[0].Config.Overlay.Node
node0 := planet.StorageNodes[0]
node0.Contact.Chore.Pause(ctx)
@ -87,9 +89,6 @@ func TestMinimumDiskSpace(t *testing.T) {
require.Error(t, err)
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
require.Equal(t, len(n2), len(n1))
n3, err := saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.Error(t, err)
require.Equal(t, len(n3), len(n1))
// report disk space greater than minimum
_, err = planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
@ -106,10 +105,7 @@ func TestMinimumDiskSpace(t *testing.T) {
n1, err = planet.Satellites[0].Overlay.Service.FindStorageNodesForUpload(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, len(n1))
n2, err = saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.NoError(t, err)
require.Equal(t, len(n1), len(n2))
n3, err = saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
n3, err := saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, len(n1), len(n3))
})
@ -157,127 +153,79 @@ func TestOnlineOffline(t *testing.T) {
})
}
var defaultNodes = func(i int, node *nodeselection.SelectedNode) {}
func overlayDefaultConfig(newNodeFraction float64) overlay.Config {
return overlay.Config{
Node: overlay.NodeSelectionConfig{
NewNodeFraction: newNodeFraction,
},
NodeSelectionCache: overlay.UploadSelectionCacheConfig{
Staleness: 10 * time.Hour,
},
}
}
func TestEnsureMinimumRequested(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 5,
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.MinimumDiskSpace = 10 * memory.MB
config.Reputation.InitialAlpha = 1
config.Reputation.AuditLambda = 1
config.Reputation.UnknownAuditLambda = 1
config.Reputation.AuditWeight = 1
config.Reputation.AuditDQ = 0.5
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditCount = 1
config.Reputation.AuditHistory = testAuditHistoryConfig()
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// pause chores that might update node data
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)
}
service := satellite.Overlay.Service
repService := satellite.Reputation.Service
reputable := map[storj.NodeID]bool{}
countReputable := func(selected []*nodeselection.SelectedNode) (count int) {
for _, n := range selected {
if reputable[n.ID] {
count++
}
}
return count
}
// update half of nodes to be reputable
for i := 0; i < 5; i++ {
node := planet.StorageNodes[i]
reputable[node.ID()] = true
err := repService.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess)
require.NoError(t, err)
}
err := repService.TestFlushAllNodeInfo(ctx)
require.NoError(t, err)
ctx := testcontext.New(t)
t.Run("request 5, where 1 new", func(t *testing.T) {
t.Parallel()
requestedCount, newCount := 5, 1
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction)
service, db, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), 5, 5, overlayDefaultConfig(newNodeFraction), defaultNodes)
defer cleanup()
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
nodes, err := service.FindStorageNodesWithPreferences(ctx, req, &preferences)
nodes, err := service.FindStorageNodesForUpload(ctx, req)
require.NoError(t, err)
require.Len(t, nodes, requestedCount)
require.Equal(t, requestedCount-newCount, countReputable(nodes))
require.Equal(t, requestedCount-newCount, countCommon(db.reputable, nodes))
})
t.Run("request 5, all new", func(t *testing.T) {
t.Parallel()
requestedCount, newCount := 5, 5
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction)
service, db, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), 5, 5, overlayDefaultConfig(newNodeFraction), defaultNodes)
defer cleanup()
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
nodes, err := service.FindStorageNodesWithPreferences(ctx, req, &preferences)
nodes, err := service.FindStorageNodesForUpload(ctx, req)
require.NoError(t, err)
require.Len(t, nodes, requestedCount)
require.Equal(t, 0, countReputable(nodes))
require.Equal(t, 0, countCommon(db.reputable, nodes))
n2, err := service.UploadSelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, requestedCount, len(n2))
})
// update all of them to be reputable
for i := 5; i < 10; i++ {
node := planet.StorageNodes[i]
reputable[node.ID()] = true
err := repService.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess)
require.NoError(t, err)
}
t.Run("no new nodes", func(t *testing.T) {
t.Parallel()
requestedCount, newCount := 5, 1.0
newNodeFraction := newCount / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction)
satellite.Config.Overlay.Node = testNodeSelectionConfig(newNodeFraction)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
service, db, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), 10, 0, overlayDefaultConfig(newNodeFraction), defaultNodes)
defer cleanup()
nodes, err := service.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}, &preferences)
})
require.NoError(t, err)
require.Len(t, nodes, requestedCount)
// all of them should be reputable because there are no new nodes
require.Equal(t, 5, countReputable(nodes))
})
require.Equal(t, 5, countCommon(db.reputable, nodes))
})
}
func TestNodeSelection(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Reputation.AuditHistory = testAuditHistoryConfig()
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
errNotEnoughNodes := &overlay.ErrNotEnoughNodes
tests := []struct {
description string
@ -286,7 +234,7 @@ func TestNodeSelection(t *testing.T) {
reputableNodes int
expectedCount int
shouldFailWith *errs.Class
exclude func() (excludedNodes []storj.NodeID)
exclude int
}{
{
description: "all reputable nodes, only reputable nodes requested",
@ -346,33 +294,28 @@ func TestNodeSelection(t *testing.T) {
newNodeFraction: 0,
reputableNodes: 6,
expectedCount: 1,
exclude: 5,
shouldFailWith: errNotEnoughNodes,
exclude: func() (excludedNodes []storj.NodeID) {
for _, storageNode := range planet.StorageNodes[:5] {
excludedNodes = append(excludedNodes, storageNode.ID())
}
return excludedNodes
},
},
}
ctx := testcontext.New(t)
for _, tt := range tests {
t.Log(tt.description)
t.Run(tt.description, func(t *testing.T) {
service, db, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), tt.reputableNodes, 6, overlayDefaultConfig(tt.newNodeFraction), defaultNodes)
defer cleanup()
var excludedNodes []storj.NodeID
if tt.exclude != nil {
excludedNodes = tt.exclude()
if tt.exclude > 0 {
for i := 0; i < tt.exclude; i++ {
excludedNodes = append(excludedNodes, db.reputable[i].ID)
}
for i, node := range planet.StorageNodes {
if i < tt.reputableNodes {
_, err := satellite.Overlay.Service.TestVetNode(ctx, node.ID())
require.NoError(t, err)
} else {
err := satellite.Overlay.Service.TestUnvetNode(ctx, node.ID())
require.NoError(t, err)
}
}
config := testNodeSelectionConfig(tt.newNodeFraction)
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{RequestedCount: tt.requestCount, ExcludedIDs: excludedNodes}, &config)
response, err := service.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{RequestedCount: tt.requestCount, ExcludedIDs: excludedNodes})
if tt.shouldFailWith != nil {
require.Error(t, err)
assert.True(t, tt.shouldFailWith.Has(err))
@ -387,56 +330,17 @@ func TestNodeSelection(t *testing.T) {
}
}
require.Equal(t, tt.expectedCount, len(response))
}
})
}
}
func TestNodeSelectionGracefulExit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.MinimumDiskSpace = 10 * memory.MB
config.Reputation.InitialAlpha = 1
config.Reputation.AuditLambda = 1
config.Reputation.UnknownAuditLambda = 1
config.Reputation.AuditWeight = 1
config.Reputation.AuditDQ = 0.5
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditHistory = testAuditHistoryConfig()
config.Reputation.AuditCount = 5 // need 5 audits to be vetted
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
exitingNodes := make(map[storj.NodeID]bool)
// This sets audit counts of 0, 1, 2, 3, ... 9
// so that we can fine-tune how many nodes are considered new or reputable
// by modifying the audit count cutoff passed into FindStorageNodesWithPreferences
// nodes at indices 0, 2, 4, 6, 8 are gracefully exiting
for i, node := range planet.StorageNodes {
for k := 0; k < i; k++ {
err := satellite.Reputation.Service.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess)
require.NoError(t, err)
}
// make half the nodes gracefully exiting
if i%2 == 0 {
_, err := satellite.DB.OverlayCache().UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: node.ID(),
ExitInitiatedAt: time.Now(),
})
require.NoError(t, err)
exitingNodes[node.ID()] = true
}
}
// There are now 5 new nodes, and 5 reputable (vetted) nodes. 3 of the
// new nodes are gracefully exiting, and 2 of the reputable nodes.
type test struct {
Preferences overlay.NodeSelectionConfig
NewNodeFraction float64
ExcludeCount int
RequestCount int
ExpectedCount int
@ -445,63 +349,62 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
for i, tt := range []test{
{ // reputable and new nodes, happy path
Preferences: testNodeSelectionConfig(0.5),
NewNodeFraction: 0.5,
RequestCount: 5,
ExpectedCount: 5, // 2 new + 3 vetted
},
{ // all reputable nodes, happy path
Preferences: testNodeSelectionConfig(0),
NewNodeFraction: 0,
RequestCount: 3,
ExpectedCount: 3,
},
{ // all new nodes, happy path
Preferences: testNodeSelectionConfig(1),
NewNodeFraction: 1,
RequestCount: 2,
ExpectedCount: 2,
},
{ // reputable and new nodes, requested too many
Preferences: testNodeSelectionConfig(0.5),
NewNodeFraction: 0.5,
RequestCount: 10,
ExpectedCount: 5, // 2 new + 3 vetted
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all reputable nodes, requested too many
Preferences: testNodeSelectionConfig(0),
NewNodeFraction: 0,
RequestCount: 10,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all new nodes, requested too many
Preferences: testNodeSelectionConfig(1),
NewNodeFraction: 1,
RequestCount: 10,
ExpectedCount: 2,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
} {
t.Logf("#%2d. %+v", i, tt)
t.Run(fmt.Sprintf("#%2d. %+v", i, tt), func(t *testing.T) {
ctx := testcontext.New(t)
service, _, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), 5, 0, overlayDefaultConfig(tt.NewNodeFraction), defaultNodes)
defer cleanup()
response, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx,
overlay.FindStorageNodesRequest{
response, err := service.FindStorageNodesForGracefulExit(ctx, overlay.FindStorageNodesRequest{
RequestedCount: tt.RequestCount,
AsOfSystemInterval: -time.Microsecond,
}, &tt.Preferences)
})
t.Log(len(response), err)
if tt.ShouldFailWith != nil {
assert.Error(t, err)
assert.True(t, tt.ShouldFailWith.Has(err))
} else {
assert.NoError(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.ExpectedCount, len(response))
// expect no exiting nodes in selection
for _, node := range response {
assert.False(t, exitingNodes[node.ID])
}
}
})
}
}
func TestFindStorageNodesDistinctNetworks(t *testing.T) {
@ -553,32 +456,19 @@ func TestFindStorageNodesDistinctNetworks(t *testing.T) {
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
require.NotEqual(t, n2[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n2[1].LastIPPort, excludedNodeAddr)
n3, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
require.NoError(t, err)
require.Len(t, n3, 2)
require.NotEqual(t, n3[0].LastIPPort, n3[1].LastIPPort)
require.NotEqual(t, n3[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n3[1].LastIPPort, excludedNodeAddr)
req = overlay.FindStorageNodesRequest{
RequestedCount: 4,
ExcludedIDs: excludedNodes,
}
n, err := satellite.Overlay.Service.FindStorageNodesForUpload(ctx, req)
_, err = satellite.Overlay.Service.FindStorageNodesForUpload(ctx, req)
require.Error(t, err)
n1, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
_, err = satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
require.Error(t, err)
require.Equal(t, len(n), len(n1))
n2, err = satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
require.Error(t, err)
require.Equal(t, len(n1), len(n2))
})
}
func TestSelectNewStorageNodesExcludedIPs(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -625,7 +515,7 @@ func TestSelectNewStorageNodesExcludedIPs(t *testing.T) {
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
require.NotEqual(t, n2[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n2[1].LastIPPort, excludedNodeAddr)
n3, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
n3, err := satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Len(t, n3, 2)
require.NotEqual(t, n3[0].LastIPPort, n3[1].LastIPPort)
@ -635,113 +525,59 @@ func TestSelectNewStorageNodesExcludedIPs(t *testing.T) {
}
func TestDistinctIPs(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 3,
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Reputation.InitialAlpha = 1
config.Reputation.AuditLambda = 1
config.Reputation.UnknownAuditLambda = 1
config.Reputation.AuditWeight = 1
config.Reputation.AuditDQ = 0.5
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditHistory = testAuditHistoryConfig()
config.Reputation.AuditCount = 1
config.Overlay.Node.DistinctIP = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// Vets nodes[8] and nodes[9].
for i := 9; i > 7; i-- {
err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), overlay.ReputationStatus{}, reputation.AuditSuccess)
assert.NoError(t, err)
}
testDistinctIPs(t, ctx, planet)
})
}
func TestDistinctIPsWithBatch(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 3, // creates 3 additional unique ip addresses, totaling to 4 IPs
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.UpdateStatsBatchSize = 1
config.Reputation.InitialAlpha = 1
config.Reputation.AuditLambda = 1
config.Reputation.UnknownAuditLambda = 1
config.Reputation.AuditWeight = 1
config.Reputation.AuditDQ = 0.5
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditHistory = testAuditHistoryConfig()
config.Reputation.AuditCount = 1
config.Overlay.Node.DistinctIP = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// Vets nodes[8] and nodes[9].
for i := 9; i > 7; i-- {
err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), overlay.ReputationStatus{}, reputation.AuditSuccess)
assert.NoError(t, err)
}
testDistinctIPs(t, ctx, planet)
})
}
func testDistinctIPs(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
tests := []struct {
requestCount int
preferences overlay.NodeSelectionConfig
newNodeFraction float64
shouldFailWith *errs.Class
}{
{ // test only distinct IPs with half new nodes
// expect 2 new and 2 vetted
requestCount: 4,
preferences: testNodeSelectionConfig(0.5),
newNodeFraction: 0.5,
},
{ // test not enough distinct IPs
requestCount: 5, // expect 3 new, 2 old but fails because only 4 distinct IPs, not 5
preferences: testNodeSelectionConfig(0.6),
newNodeFraction: 0.6,
shouldFailWith: &overlay.ErrNotEnoughNodes,
},
}
for _, tt := range tests {
response, err := service.FindStorageNodesWithPreferences(ctx,
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx := testcontext.New(t)
config := overlayDefaultConfig(tt.newNodeFraction)
config.Node.DistinctIP = true
service, _, cleanup := runServiceWithDB(ctx, zaptest.NewLogger(t), 8, 2, config, func(i int, node *nodeselection.SelectedNode) {
if i < 7 {
node.LastIPPort = fmt.Sprintf("54.0.0.1:%d", rand.Intn(30000)+1000)
node.LastNet = "54.0.0.0"
}
})
defer cleanup()
response, err := service.FindStorageNodesForUpload(ctx,
overlay.FindStorageNodesRequest{
RequestedCount: tt.requestCount,
AsOfSystemInterval: -time.Microsecond,
}, &tt.preferences)
})
if tt.shouldFailWith != nil {
assert.Error(t, err)
assert.True(t, tt.shouldFailWith.Has(err))
continue
} else {
require.NoError(t, err)
return
}
require.NoError(t, err)
// assert all IPs are unique
if tt.preferences.DistinctIP {
ips := make(map[string]bool)
for _, n := range response {
assert.False(t, ips[n.LastIPPort])
ips[n.LastIPPort] = true
}
}
assert.Equal(t, tt.requestCount, len(response))
})
}
}
@ -773,28 +609,40 @@ func TestAddrtoNetwork_Conversion(t *testing.T) {
runTest(t, "fc00::1:200", "28967", false, 0, 0, "[fc00::1:200]:28967")
}
func TestCacheSelectionVsDBSelection(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
func countCommon(reference []*nodeselection.SelectedNode, selected []*nodeselection.SelectedNode) (count int) {
for _, r := range reference {
for _, n := range selected {
if r.ID == n.ID {
count++
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 5,
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.StorageNodes[0].Storage2.Monitor.Loop.Pause()
saOverlay := planet.Satellites[0].Overlay
nodeConfig := planet.Satellites[0].Config.Overlay.Node
req := overlay.FindStorageNodesRequest{RequestedCount: 5}
n1, err := saOverlay.Service.FindStorageNodesForUpload(ctx, req)
require.NoError(t, err)
n2, err := saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, len(n2), len(n1))
n3, err := saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.NoError(t, err)
require.Equal(t, len(n3), len(n2))
})
}
}
return count
}
func runServiceWithDB(ctx *testcontext.Context, log *zap.Logger, reputable int, new int, config overlay.Config, nodeCustomization func(i int, node *nodeselection.SelectedNode)) (*overlay.Service, *mockdb, func()) {
db := &mockdb{}
for i := 0; i < reputable+new; i++ {
node := nodeselection.SelectedNode{
ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID,
LastNet: fmt.Sprintf("10.9.%d.0", i),
Address: &pb.NodeAddress{
Address: fmt.Sprintf("10.9.%d.1:9999", i),
},
LastIPPort: fmt.Sprintf("10.9.%d.1:9999", i),
}
nodeCustomization(i, &node)
if i >= reputable {
db.new = append(db.new, &node)
} else {
db.reputable = append(db.reputable, &node)
}
}
service, _ := overlay.NewService(log, db, nil, overlay.NewPlacementRules().CreateFilters, "", "", config)
serviceCtx, cancel := context.WithCancel(ctx)
ctx.Go(func() error {
return service.Run(serviceCtx)
})
return service, db, cancel
}

View File

@ -424,21 +424,13 @@ func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req
return service.UploadSelectionCache.GetNodes(ctx, req)
}
// FindStorageNodesForUpload searches the overlay network for nodes that meet the provided requirements for upload.
//
// When enabled it uses the cache to select nodes.
// When the node selection from the cache fails, it falls back to the old implementation.
// FindStorageNodesForUpload searches the for nodes in the cache that meet the provided requirements for upload.
func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindStorageNodesRequest) (_ []*nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 {
req.AsOfSystemInterval = service.config.Node.AsOfSystemTime.DefaultInterval
}
// TODO excluding country codes on upload if cache is disabled is not implemented
if service.config.NodeSelectionCache.Disabled {
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
}
selectedNodes, err := service.UploadSelectionCache.GetNodes(ctx, req)
if err != nil {
return selectedNodes, err
@ -461,50 +453,6 @@ func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindS
return selectedNodes, err
}
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria.
//
// This does not use a cache.
func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (nodes []*nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
totalNeededNodes := req.RequestedCount
excludedIDs := req.ExcludedIDs
// keep track of the network to make sure we only select nodes from different networks
var excludedNetworks []string
if len(excludedIDs) > 0 {
excludedNetworks, err = service.db.GetNodesNetwork(ctx, excludedIDs)
if err != nil {
return nil, Error.Wrap(err)
}
}
newNodeCount := 0
if preferences.NewNodeFraction > 0 {
newNodeCount = int(float64(totalNeededNodes) * preferences.NewNodeFraction)
}
criteria := NodeCriteria{
FreeDisk: preferences.MinimumDiskSpace.Int64(),
ExcludedIDs: excludedIDs,
ExcludedNetworks: excludedNetworks,
MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow,
AsOfSystemInterval: req.AsOfSystemInterval,
}
nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria)
if err != nil {
return nil, Error.Wrap(err)
}
if len(nodes) < totalNeededNodes {
return nodes, ErrNotEnoughNodes.New("requested %d found %d; %+v ", totalNeededNodes, len(nodes), criteria)
}
return nodes, nil
}
// InsertOfflineNodeEvents inserts offline events into node events.
func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown time.Duration, cutoff time.Duration, limit int) (count int, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -23,7 +23,7 @@ type UploadSelectionDB interface {
// UploadSelectionCacheConfig is a configuration for upload selection cache.
type UploadSelectionCacheConfig struct {
Disabled bool `help:"disable node cache" default:"false"`
Disabled bool `help:"disable node cache" default:"false" deprecated:"true"`
Staleness time.Duration `help:"how stale the node selection cache can be" releaseDefault:"3m" devDefault:"5m" testDefault:"3m"`
}

View File

@ -25,6 +25,7 @@ import (
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/private/version"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/nodeselection"
@ -746,3 +747,183 @@ func generatedSelectedNodes(b *testing.B, nodeNo int) []*nodeselection.SelectedN
}
return nodes
}
// GetOnlineNodesForAuditRepair satisfies nodeevents.DB interface.
func (m *mockdb) GetOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (map[storj.NodeID]*overlay.NodeReputation, error) {
panic("implement me")
}
// SelectStorageNodes satisfies nodeevents.DB interface.
func (m *mockdb) SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *overlay.NodeCriteria) ([]*nodeselection.SelectedNode, error) {
panic("implement me")
}
// SelectAllStorageNodesDownload satisfies nodeevents.DB interface.
func (m *mockdb) SelectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOf overlay.AsOfSystemTimeConfig) ([]*nodeselection.SelectedNode, error) {
panic("implement me")
}
// Get satisfies nodeevents.DB interface.
func (m *mockdb) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) {
panic("implement me")
}
// KnownReliable satisfies nodeevents.DB interface.
func (m *mockdb) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) {
panic("implement me")
}
// Reliable satisfies nodeevents.DB interface.
func (m *mockdb) Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) {
panic("implement me")
}
// UpdateReputation satisfies nodeevents.DB interface.
func (m *mockdb) UpdateReputation(ctx context.Context, id storj.NodeID, request overlay.ReputationUpdate) error {
panic("implement me")
}
// UpdateNodeInfo satisfies nodeevents.DB interface.
func (m *mockdb) UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *overlay.InfoResponse) (stats *overlay.NodeDossier, err error) {
panic("implement me")
}
// UpdateCheckIn satisfies nodeevents.DB interface.
func (m *mockdb) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
panic("implement me")
}
// SetNodeContained satisfies nodeevents.DB interface.
func (m *mockdb) SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error) {
panic("implement me")
}
// SetAllContainedNodes satisfies nodeevents.DB interface.
func (m *mockdb) SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error) {
panic("implement me")
}
// AllPieceCounts satisfies nodeevents.DB interface.
func (m *mockdb) AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) {
panic("implement me")
}
// UpdatePieceCounts satisfies nodeevents.DB interface.
func (m *mockdb) UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int64) (err error) {
panic("implement me")
}
// UpdateExitStatus satisfies nodeevents.DB interface.
func (m *mockdb) UpdateExitStatus(ctx context.Context, request *overlay.ExitStatusRequest) (_ *overlay.NodeDossier, err error) {
panic("implement me")
}
// GetExitingNodes satisfies nodeevents.DB interface.
func (m *mockdb) GetExitingNodes(ctx context.Context) (exitingNodes []*overlay.ExitStatus, err error) {
panic("implement me")
}
// GetGracefulExitCompletedByTimeFrame satisfies nodeevents.DB interface.
func (m *mockdb) GetGracefulExitCompletedByTimeFrame(ctx context.Context, begin, end time.Time) (exitedNodes storj.NodeIDList, err error) {
panic("implement me")
}
// GetGracefulExitIncompleteByTimeFrame satisfies nodeevents.DB interface.
func (m *mockdb) GetGracefulExitIncompleteByTimeFrame(ctx context.Context, begin, end time.Time) (exitingNodes storj.NodeIDList, err error) {
panic("implement me")
}
// GetExitStatus satisfies nodeevents.DB interface.
func (m *mockdb) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *overlay.ExitStatus, err error) {
panic("implement me")
}
// GetNodesNetwork satisfies nodeevents.DB interface.
func (m *mockdb) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
panic("implement me")
}
// GetNodesNetworkInOrder satisfies nodeevents.DB interface.
func (m *mockdb) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
panic("implement me")
}
// DisqualifyNode satisfies nodeevents.DB interface.
func (m *mockdb) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (email string, err error) {
panic("implement me")
}
// GetOfflineNodesForEmail satisfies nodeevents.DB interface.
func (m *mockdb) GetOfflineNodesForEmail(ctx context.Context, offlineWindow time.Duration, cutoff time.Duration, cooldown time.Duration, limit int) (nodes map[storj.NodeID]string, err error) {
panic("implement me")
}
// UpdateLastOfflineEmail satisfies nodeevents.DB interface.
func (m *mockdb) UpdateLastOfflineEmail(ctx context.Context, nodeIDs storj.NodeIDList, timestamp time.Time) (err error) {
panic("implement me")
}
// DQNodesLastSeenBefore satisfies nodeevents.DB interface.
func (m *mockdb) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (nodeEmails map[storj.NodeID]string, count int, err error) {
panic("implement me")
}
// TestSuspendNodeUnknownAudit satisfies nodeevents.DB interface.
func (m *mockdb) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
panic("implement me")
}
// TestUnsuspendNodeUnknownAudit satisfies nodeevents.DB interface.
func (m *mockdb) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) {
panic("implement me")
}
// TestVetNode satisfies nodeevents.DB interface.
func (m *mockdb) TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error) {
panic("implement me")
}
// TestUnvetNode satisfies nodeevents.DB interface.
func (m *mockdb) TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error) {
panic("implement me")
}
// TestSuspendNodeOffline satisfies nodeevents.DB interface.
func (m *mockdb) TestSuspendNodeOffline(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
panic("implement me")
}
// TestNodeCountryCode satisfies nodeevents.DB interface.
func (m *mockdb) TestNodeCountryCode(ctx context.Context, nodeID storj.NodeID, countryCode string) (err error) {
panic("implement me")
}
// TestUpdateCheckInDirectUpdate satisfies nodeevents.DB interface.
func (m *mockdb) TestUpdateCheckInDirectUpdate(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, semVer version.SemVer, walletFeatures string) (updated bool, err error) {
panic("implement me")
}
// OneTimeFixLastNets satisfies nodeevents.DB interface.
func (m *mockdb) OneTimeFixLastNets(ctx context.Context) error {
panic("implement me")
}
// IterateAllContactedNodes satisfies nodeevents.DB interface.
func (m *mockdb) IterateAllContactedNodes(ctx context.Context, f func(context.Context, *nodeselection.SelectedNode) error) error {
panic("implement me")
}
// IterateAllNodeDossiers satisfies nodeevents.DB interface.
func (m *mockdb) IterateAllNodeDossiers(ctx context.Context, f func(context.Context, *overlay.NodeDossier) error) error {
panic("implement me")
}
// UpdateNodeTags satisfies nodeevents.DB interface.
func (m *mockdb) UpdateNodeTags(ctx context.Context, tags nodeselection.NodeTags) error {
panic("implement me")
}
// GetNodeTags satisfies nodeevents.DB interface.
func (m *mockdb) GetNodeTags(ctx context.Context, id storj.NodeID) (nodeselection.NodeTags, error) {
panic("implement me")
}