satellite/overlay: configurable meaning of last_net

Up to now, we have been implementing the DistinctIP preference with code
in two places:

 1. On check-in, the last_net is determined by taking the /24 or /64
    (in ResolveIPAndNetwork()) and we store it with the node record.
 2. On node selection, a preference parameter defines whether to return
    results that are distinct on last_net.

It can be observed that we have never yet had the need to switch from
DistinctIP to !DistinctIP, or from !DistinctIP to DistinctIP, on the
same satellite, and we will probably never need to do so in an automated
way. It can also be observed that this arrangement makes tests more
complicated, because we often have to arrange for test nodes to have IP
addresses in different /24 networks (a particular pain on macOS).

Those two considerations, plus some pending work on the repair framework
that will make repair take last_net into consideration, motivate this
change.

With this change, in the #2 place, we will _always_ return results that
are distinct on last_net. We implement the DistinctIP preference, then,
by making the #1 place (ResolveIPAndNetwork()) more flexible. When
DistinctIP is enabled, last_net will be calculated as it was before. But
when DistinctIP is _off_, last_net can be the same as address (IP and
port). That will effectively implement !DistinctIP because every
record will have a distinct last_net already.

As a side effect, this flexibility will allow us to change the rules
about last_net construction arbitrarily. We can do tests where last_net
is set to the source IP, or to a /30 prefix, or a /16 prefix, etc., and
be able to exercise the production logic without requiring a virtual
network bridge.

This change should be safe to make without any migration code, because
all known production satellite deployments use DistinctIP, and the
associated last_net values will not change for them. They will only
change for satellites with !DistinctIP, which are mostly test
deployments that can be recreated trivially. For those satellites which
are both permanent and !DistinctIP, node selection will suddenly start
acting as though DistinctIP is enabled, until the operator runs a single
SQL update "UPDATE nodes SET last_net = last_ip_port". That can be done
either before or after deploying software with this change.

I also assert that this will not hurt performance for production
deployments. It's true that adding the distinct requirement to node
selection makes things a little slower, but the distinct requirement is
already present for all production deployments, and they will see no
change.

Refs: https://github.com/storj/storj/issues/5391
Change-Id: I0e7e92498c3da768df5b4d5fb213dcd2d4862924
This commit is contained in:
paul cannon 2023-02-28 16:57:39 -06:00 committed by Storj Robot
parent 67ad792d1a
commit 2522ff09b6
14 changed files with 224 additions and 188 deletions

View File

@ -57,6 +57,7 @@ type Config struct {
MultinodeCount int
IdentityVersion *storj.IDVersion
LastNetFunc overlay.LastNetFunc
Reconfigure Reconfigure
Name string

View File

@ -533,6 +533,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, errs.Wrap(err)
}
if planet.config.LastNetFunc != nil {
peer.Overlay.Service.LastNetFunc = planet.config.LastNetFunc
}
err = db.Testing().TestMigrateToLatest(ctx)
if err != nil {
return nil, errs.Wrap(err)

View File

@ -72,7 +72,7 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, errCheckInIdentity.New("failed to add peer identity entry for ID: %v", err).Error())
}
resolvedIP, port, resolvedNetwork, err := overlay.ResolveIPAndNetwork(ctx, req.Address)
resolvedIP, port, resolvedNetwork, err := endpoint.service.overlay.ResolveIPAndNetwork(ctx, req.Address)
if err != nil {
endpoint.log.Info("failed to resolve IP from address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errCheckInNetwork.New("failed to resolve IP from address: %s, err: %v", req.Address, err).Error())
@ -205,7 +205,7 @@ func (endpoint *Endpoint) PingMe(ctx context.Context, req *pb.PingMeRequest) (_
Address: req.Address,
}
resolvedIP, _, _, err := overlay.ResolveIPAndNetwork(ctx, req.Address)
resolvedIP, _, _, err := endpoint.service.overlay.ResolveIPAndNetwork(ctx, req.Address)
if err != nil {
endpoint.log.Info("failed to resolve IP from address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errCheckInNetwork.New("failed to resolve IP from address: %s, err: %v", req.Address, err).Error())

View File

@ -23,12 +23,7 @@ type State struct {
stats Stats
// netByID returns subnet based on storj.NodeID
netByID map[storj.NodeID]string
// nonDistinct contains selectors for non-distinct selection.
nonDistinct struct {
Reputable SelectByID
New SelectByID
}
// distinct contains selectors for distinct slection.
// distinct contains selectors for distinct selection.
distinct struct {
Reputable SelectBySubnet
New SelectBySubnet
@ -39,9 +34,6 @@ type State struct {
type Stats struct {
New int
Reputable int
NewDistinct int
ReputableDistinct int
}
// Selector defines interface for selecting nodes.
@ -65,18 +57,12 @@ func NewState(reputableNodes, newNodes []*Node) *State {
state.netByID[node.ID] = node.LastNet
}
state.nonDistinct.Reputable = SelectByID(reputableNodes)
state.nonDistinct.New = SelectByID(newNodes)
state.distinct.Reputable = SelectBySubnetFromNodes(reputableNodes)
state.distinct.New = SelectBySubnetFromNodes(newNodes)
state.stats = Stats{
New: state.nonDistinct.New.Count(),
Reputable: state.nonDistinct.Reputable.Count(),
NewDistinct: state.distinct.New.Count(),
ReputableDistinct: state.distinct.Reputable.Count(),
New: state.distinct.New.Count(),
Reputable: state.distinct.Reputable.Count(),
}
return state
@ -86,7 +72,6 @@ func NewState(reputableNodes, newNodes []*Node) *State {
type Request struct {
Count int
NewFraction float64
Distinct bool
ExcludedIDs []storj.NodeID
Placement storj.PlacementConstraint
ExcludedCountryCodes []string
@ -119,19 +104,14 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
criteria.Placement = request.Placement
if request.Distinct {
criteria.AutoExcludeSubnets = make(map[string]struct{})
for _, id := range request.ExcludedIDs {
if net, ok := state.netByID[id]; ok {
criteria.AutoExcludeSubnets[net] = struct{}{}
}
criteria.AutoExcludeSubnets = make(map[string]struct{})
for _, id := range request.ExcludedIDs {
if net, ok := state.netByID[id]; ok {
criteria.AutoExcludeSubnets[net] = struct{}{}
}
reputableNodes = state.distinct.Reputable
newNodes = state.distinct.New
} else {
reputableNodes = state.nonDistinct.Reputable
newNodes = state.nonDistinct.New
}
reputableNodes = state.distinct.Reputable
newNodes = state.distinct.New
// Get a random selection of new nodes out of the cache first so that if there aren't
// enough new nodes on the network, we can fall back to using reputable nodes instead.

View File

@ -16,25 +16,23 @@ import (
"storj.io/storj/satellite/nodeselection/uploadselection"
)
func TestState_Select(t *testing.T) {
func TestState_SelectNonDistinct(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := joinNodes(
createRandomNodes(2, "1.0.1"),
createRandomNodes(3, "1.0.2"),
createRandomNodes(2, "1.0.1", false),
createRandomNodes(3, "1.0.2", false),
)
newNodes := joinNodes(
createRandomNodes(2, "1.0.3"),
createRandomNodes(3, "1.0.4"),
createRandomNodes(2, "1.0.3", false),
createRandomNodes(3, "1.0.4", false),
)
state := uploadselection.NewState(reputableNodes, newNodes)
require.Equal(t, uploadselection.Stats{
New: 5,
Reputable: 5,
NewDistinct: 2,
ReputableDistinct: 2,
New: 5,
Reputable: 5,
}, state.Stats())
{ // select 5 non-distinct subnet reputable nodes
@ -42,59 +40,18 @@ func TestState_Select(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
Distinct: false,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
}
{ // select 2 distinct subnet reputable nodes
const selectCount = 2
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
Distinct: true,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
}
{ // try to select 5 distinct subnet reputable nodes, but there are only two 2 in the state
const selectCount = 5
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
Distinct: true,
ExcludedIDs: nil,
})
require.Error(t, err)
require.Len(t, selected, 2)
}
{ // select 6 non-distinct subnet reputable and new nodes (50%)
const selectCount = 6
const newFraction = 0.5
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
Distinct: false,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
require.Len(t, intersectLists(selected, reputableNodes), selectCount*(1-newFraction))
require.Len(t, intersectLists(selected, newNodes), selectCount*newFraction)
}
{ // select 4 distinct subnet reputable and new nodes (50%)
const selectCount = 4
const newFraction = 0.5
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
Distinct: true,
ExcludedIDs: nil,
})
require.NoError(t, err)
@ -109,7 +66,6 @@ func TestState_Select(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
Distinct: false,
ExcludedIDs: nil,
})
require.NoError(t, err)
@ -119,17 +75,73 @@ func TestState_Select(t *testing.T) {
}
}
func TestState_SelectDistinct(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := joinNodes(
createRandomNodes(2, "1.0.1", true),
createRandomNodes(3, "1.0.2", true),
)
newNodes := joinNodes(
createRandomNodes(2, "1.0.3", true),
createRandomNodes(3, "1.0.4", true),
)
state := uploadselection.NewState(reputableNodes, newNodes)
require.Equal(t, uploadselection.Stats{
New: 2,
Reputable: 2,
}, state.Stats())
{ // select 2 distinct subnet reputable nodes
const selectCount = 2
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
}
{ // try to select 5 distinct subnet reputable nodes, but there are only two 2 in the state
const selectCount = 5
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
ExcludedIDs: nil,
})
require.Error(t, err)
require.Len(t, selected, 2)
}
{ // select 4 distinct subnet reputable and new nodes (50%)
const selectCount = 4
const newFraction = 0.5
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
require.Len(t, intersectLists(selected, reputableNodes), selectCount*(1-newFraction))
require.Len(t, intersectLists(selected, newNodes), selectCount*newFraction)
}
}
func TestState_Select_Concurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := joinNodes(
createRandomNodes(2, "1.0.1"),
createRandomNodes(3, "1.0.2"),
createRandomNodes(2, "1.0.1", false),
createRandomNodes(3, "1.0.2", false),
)
newNodes := joinNodes(
createRandomNodes(2, "1.0.3"),
createRandomNodes(3, "1.0.4"),
createRandomNodes(2, "1.0.3", false),
createRandomNodes(3, "1.0.4", false),
)
state := uploadselection.NewState(reputableNodes, newNodes)
@ -140,7 +152,6 @@ func TestState_Select_Concurrent(t *testing.T) {
nodes, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0.5,
Distinct: false,
ExcludedIDs: nil,
})
require.Len(t, nodes, selectCount)
@ -152,7 +163,6 @@ func TestState_Select_Concurrent(t *testing.T) {
nodes, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0.5,
Distinct: true,
ExcludedIDs: nil,
})
require.Len(t, nodes, selectCount)
@ -162,7 +172,7 @@ func TestState_Select_Concurrent(t *testing.T) {
}
// createRandomNodes creates n random nodes all in the subnet.
func createRandomNodes(n int, subnet string) []*uploadselection.Node {
func createRandomNodes(n int, subnet string, shareNets bool) []*uploadselection.Node {
xs := make([]*uploadselection.Node, n)
for i := range xs {
addr := subnet + "." + strconv.Itoa(i) + ":8080"
@ -171,9 +181,13 @@ func createRandomNodes(n int, subnet string) []*uploadselection.Node {
ID: testrand.NodeID(),
Address: addr,
},
LastNet: subnet,
LastIPPort: addr,
}
if shareNets {
xs[i].LastNet = subnet
} else {
xs[i].LastNet = addr
}
}
return xs
}

View File

@ -290,7 +290,6 @@ func BenchmarkNodeSelection(b *testing.B) {
ExcludedNetworks: nil,
MinimumVersion: "v1.0.0",
OnlineWindow: time.Hour,
DistinctIP: false,
AsOfSystemInterval: -time.Microsecond,
}
excludedCriteria := &overlay.NodeCriteria{
@ -299,7 +298,6 @@ func BenchmarkNodeSelection(b *testing.B) {
ExcludedNetworks: excludedNets,
MinimumVersion: "v1.0.0",
OnlineWindow: time.Hour,
DistinctIP: false,
AsOfSystemInterval: -time.Microsecond,
}

View File

@ -40,11 +40,13 @@ type AsOfSystemTimeConfig struct {
// NodeSelectionConfig is a configuration struct to determine the minimum
// values for nodes to select.
type NodeSelectionConfig struct {
NewNodeFraction float64 `help:"the fraction of new nodes allowed per request" releaseDefault:"0.05" devDefault:"1"`
MinimumVersion string `help:"the minimum node software version for node selection queries" default:""`
OnlineWindow time.Duration `help:"the amount of time without seeing a node before its considered offline" default:"4h" testDefault:"1m"`
DistinctIP bool `help:"require distinct IPs when choosing nodes for upload" releaseDefault:"true" devDefault:"false"`
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum must have to be selected for upload" default:"500.00MB" testDefault:"100.00MB"`
NewNodeFraction float64 `help:"the fraction of new nodes allowed per request" releaseDefault:"0.05" devDefault:"1"`
MinimumVersion string `help:"the minimum node software version for node selection queries" default:""`
OnlineWindow time.Duration `help:"the amount of time without seeing a node before its considered offline" default:"4h" testDefault:"1m"`
DistinctIP bool `help:"require distinct IPs when choosing nodes for upload" releaseDefault:"true" devDefault:"false"`
NetworkPrefixIPv4 int `help:"the prefix to use in determining 'network' for IPv4 addresses" default:"24" hidden:"true"`
NetworkPrefixIPv6 int `help:"the prefix to use in determining 'network' for IPv6 addresses" default:"64" hidden:"true"`
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum must have to be selected for upload" default:"500.00MB" testDefault:"100.00MB"`
AsOfSystemTime AsOfSystemTimeConfig

View File

@ -6,6 +6,7 @@ package overlay_test
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"runtime"
"strings"
@ -204,7 +205,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("request 5, where 1 new", func(t *testing.T) {
requestedCount, newCount := 5, 1
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction)
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
@ -217,7 +218,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("request 5, all new", func(t *testing.T) {
requestedCount, newCount := 5, 5
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction)
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}
@ -242,8 +243,8 @@ func TestEnsureMinimumRequested(t *testing.T) {
t.Run("no new nodes", func(t *testing.T) {
requestedCount, newCount := 5, 1.0
newNodeFraction := newCount / float64(requestedCount)
preferences := testNodeSelectionConfig(newNodeFraction, false)
satellite.Config.Overlay.Node = testNodeSelectionConfig(newNodeFraction, false)
preferences := testNodeSelectionConfig(newNodeFraction)
satellite.Config.Overlay.Node = testNodeSelectionConfig(newNodeFraction)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
@ -360,7 +361,7 @@ func TestNodeSelection(t *testing.T) {
require.NoError(t, err)
}
}
config := testNodeSelectionConfig(tt.newNodeFraction, false)
config := testNodeSelectionConfig(tt.newNodeFraction)
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{RequestedCount: tt.requestCount, ExcludedIDs: excludedNodes}, &config)
if tt.shouldFailWith != nil {
require.Error(t, err)
@ -434,34 +435,34 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
for i, tt := range []test{
{ // reputable and new nodes, happy path
Preferences: testNodeSelectionConfig(0.5, false),
Preferences: testNodeSelectionConfig(0.5),
RequestCount: 5,
ExpectedCount: 5, // 2 new + 3 vetted
},
{ // all reputable nodes, happy path
Preferences: testNodeSelectionConfig(0, false),
Preferences: testNodeSelectionConfig(0),
RequestCount: 3,
ExpectedCount: 3,
},
{ // all new nodes, happy path
Preferences: testNodeSelectionConfig(1, false),
Preferences: testNodeSelectionConfig(1),
RequestCount: 2,
ExpectedCount: 2,
},
{ // reputable and new nodes, requested too many
Preferences: testNodeSelectionConfig(0.5, false),
Preferences: testNodeSelectionConfig(0.5),
RequestCount: 10,
ExpectedCount: 5, // 2 new + 3 vetted
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all reputable nodes, requested too many
Preferences: testNodeSelectionConfig(0, false),
Preferences: testNodeSelectionConfig(0),
RequestCount: 10,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // all new nodes, requested too many
Preferences: testNodeSelectionConfig(1, false),
Preferences: testNodeSelectionConfig(1),
RequestCount: 10,
ExpectedCount: 2,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
@ -640,6 +641,7 @@ func TestDistinctIPs(t *testing.T) {
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditHistory = testAuditHistoryConfig()
config.Reputation.AuditCount = 1
config.Overlay.Node.DistinctIP = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -671,6 +673,7 @@ func TestDistinctIPsWithBatch(t *testing.T) {
config.Reputation.UnknownAuditDQ = 0.5
config.Reputation.AuditHistory = testAuditHistoryConfig()
config.Reputation.AuditCount = 1
config.Overlay.Node.DistinctIP = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -696,17 +699,13 @@ func testDistinctIPs(t *testing.T, ctx *testcontext.Context, planet *testplanet.
{ // test only distinct IPs with half new nodes
// expect 2 new and 2 vetted
requestCount: 4,
preferences: testNodeSelectionConfig(0.5, true),
preferences: testNodeSelectionConfig(0.5),
},
{ // test not enough distinct IPs
requestCount: 5, // expect 3 new, 2 old but fails because only 4 distinct IPs, not 5
preferences: testNodeSelectionConfig(0.6, true),
preferences: testNodeSelectionConfig(0.6),
shouldFailWith: &overlay.ErrNotEnoughNodes,
},
{ // test distinct flag false allows duplicate IP addresses
requestCount: 5, // expect 3 new, 2 old
preferences: testNodeSelectionConfig(0.6, false),
},
}
for _, tt := range tests {
@ -740,17 +739,28 @@ func TestAddrtoNetwork_Conversion(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
ip := "8.8.8.8:28967"
resolvedIP, port, network, err := overlay.ResolveIPAndNetwork(ctx, ip)
require.Equal(t, "8.8.8.0", network)
require.Equal(t, ip, net.JoinHostPort(resolvedIP.String(), port))
require.NoError(t, err)
runTest := func(t *testing.T, ipAddr, port string, distinctIPEnabled bool, ipv4Mask, ipv6Mask int, expectedNetwork string) {
t.Run(fmt.Sprintf("%s-%s-%v-%d-%d", ipAddr, port, distinctIPEnabled, ipv4Mask, ipv6Mask), func(t *testing.T) {
ipAndPort := net.JoinHostPort(ipAddr, port)
config := overlay.NodeSelectionConfig{
DistinctIP: distinctIPEnabled,
NetworkPrefixIPv4: ipv4Mask,
NetworkPrefixIPv6: ipv6Mask,
}
resolvedIP, resolvedPort, network, err := overlay.ResolveIPAndNetwork(ctx, ipAndPort, config, overlay.MaskOffLastNet)
require.NoError(t, err)
assert.Equal(t, expectedNetwork, network)
assert.Equal(t, ipAddr, resolvedIP.String())
assert.Equal(t, port, resolvedPort)
})
}
ipv6 := "[fc00::1:200]:28967"
resolvedIP, port, network, err = overlay.ResolveIPAndNetwork(ctx, ipv6)
require.Equal(t, "fc00::", network)
require.Equal(t, ipv6, net.JoinHostPort(resolvedIP.String(), port))
require.NoError(t, err)
runTest(t, "8.8.255.8", "28967", true, 17, 128, "8.8.128.0")
runTest(t, "8.8.255.8", "28967", false, 0, 0, "8.8.255.8:28967")
runTest(t, "fc00::1:200", "28967", true, 0, 64, "fc00::")
runTest(t, "fc00::1:200", "28967", true, 0, 128-16, "fc00::1:0")
runTest(t, "fc00::1:200", "28967", false, 0, 0, "[fc00::1:200]:28967")
}
func TestCacheSelectionVsDBSelection(t *testing.T) {

View File

@ -5,7 +5,7 @@ package overlay
import (
"context"
"errors"
"fmt"
"net"
"time"
@ -192,7 +192,6 @@ type NodeCriteria struct {
ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
OnlineWindow time.Duration
DistinctIP bool
AsOfSystemInterval time.Duration // only used for CRDB queries
ExcludedCountries []string
}
@ -319,8 +318,12 @@ type Service struct {
GeoIP geoip.IPToCountry
UploadSelectionCache *UploadSelectionCache
DownloadSelectionCache *DownloadSelectionCache
LastNetFunc LastNetFunc
}
// LastNetFunc is the type of a function that will be used to derive a network from an ip and port.
type LastNetFunc func(config NodeSelectionConfig, ip net.IP, port string) (string, error)
// NewService returns a new Service.
func NewService(log *zap.Logger, db DB, nodeEvents nodeevents.DB, mailService *mailservice.Service, satelliteAddr, satelliteName string, config Config) (*Service, error) {
err := config.Node.AsOfSystemTime.isValid()
@ -364,6 +367,7 @@ func NewService(log *zap.Logger, db DB, nodeEvents nodeevents.DB, mailService *m
UploadSelectionCache: uploadSelectionCache,
DownloadSelectionCache: downloadSelectionCache,
LastNetFunc: MaskOffLastNet,
}, nil
}
@ -473,10 +477,9 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
totalNeededNodes := req.RequestedCount
excludedIDs := req.ExcludedIDs
// if distinctIP is enabled, keep track of the network
// to make sure we only select nodes from different networks
// keep track of the network to make sure we only select nodes from different networks
var excludedNetworks []string
if preferences.DistinctIP && len(excludedIDs) > 0 {
if len(excludedIDs) > 0 {
excludedNetworks, err = service.db.GetNodesNetwork(ctx, excludedIDs)
if err != nil {
return nil, Error.Wrap(err)
@ -494,7 +497,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
ExcludedNetworks: excludedNetworks,
MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP,
AsOfSystemInterval: req.AsOfSystemInterval,
}
nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria)
@ -835,8 +837,14 @@ func (service *Service) SelectAllStorageNodesDownload(ctx context.Context, onlin
return service.db.SelectAllStorageNodesDownload(ctx, onlineWindow, asOf)
}
// ResolveIPAndNetwork resolves the target address and determines its IP and /24 subnet IPv4 or /64 subnet IPv6.
func ResolveIPAndNetwork(ctx context.Context, target string) (ip net.IP, port, network string, err error) {
// ResolveIPAndNetwork resolves the target address and determines its IP and appropriate subnet IPv4 or subnet IPv6.
func (service *Service) ResolveIPAndNetwork(ctx context.Context, target string) (ip net.IP, port, network string, err error) {
// LastNetFunc is MaskOffLastNet, unless changed for a test.
return ResolveIPAndNetwork(ctx, target, service.config.Node, service.LastNetFunc)
}
// ResolveIPAndNetwork resolves the target address and determines its IP and appropriate last_net, as indicated.
func ResolveIPAndNetwork(ctx context.Context, target string, config NodeSelectionConfig, lastNetFunc LastNetFunc) (ip net.IP, port, network string, err error) {
defer mon.Task()(&ctx)(&err)
host, port, err := net.SplitHostPort(target)
@ -848,19 +856,39 @@ func ResolveIPAndNetwork(ctx context.Context, target string) (ip net.IP, port, n
return nil, "", "", err
}
// If addr can be converted to 4byte notation, it is an IPv4 address, else its an IPv6 address
if ipv4 := ipAddr.IP.To4(); ipv4 != nil {
// Filter all IPv4 Addresses into /24 Subnet's
mask := net.CIDRMask(24, 32)
return ipAddr.IP, port, ipv4.Mask(mask).String(), nil
}
if ipv6 := ipAddr.IP.To16(); ipv6 != nil {
// Filter all IPv6 Addresses into /64 Subnet's
mask := net.CIDRMask(64, 128)
return ipAddr.IP, port, ipv6.Mask(mask).String(), nil
network, err = lastNetFunc(config, ipAddr.IP, port)
if err != nil {
return nil, "", "", err
}
return nil, "", "", errors.New("unable to get network for address " + ipAddr.String())
return ipAddr.IP, port, network, nil
}
// MaskOffLastNet truncates the target address to the configured CIDR ipv6Cidr or ipv6Cidr prefix,
// if DistinctIP is enabled in the config. Otherwise, it returns the joined IP and port.
func MaskOffLastNet(config NodeSelectionConfig, addr net.IP, port string) (string, error) {
if config.DistinctIP {
// Filter all IPv4 Addresses into /24 subnets, and filter all IPv6 Addresses into /64 subnets
return truncateIPToNet(addr, config.NetworkPrefixIPv4, config.NetworkPrefixIPv6)
}
// The "network" here will be the full IP and port; that is, every node will be considered to
// be on a separate network, even if they all come from one IP (such as localhost).
return net.JoinHostPort(addr.String(), port), nil
}
// truncateIPToNet truncates the target address to the given CIDR ipv4Cidr or ipv6Cidr prefix,
// according to which type of IP it is.
func truncateIPToNet(ipAddr net.IP, ipv4Cidr, ipv6Cidr int) (network string, err error) {
// If addr can be converted to 4byte notation, it is an IPv4 address, else its an IPv6 address
if ipv4 := ipAddr.To4(); ipv4 != nil {
mask := net.CIDRMask(ipv4Cidr, 32)
return ipv4.Mask(mask).String(), nil
}
if ipv6 := ipAddr.To16(); ipv6 != nil {
mask := net.CIDRMask(ipv6Cidr, 128)
return ipv6.Mask(mask).String(), nil
}
return "", fmt.Errorf("unable to get network for address %s", ipAddr.String())
}
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.

View File

@ -37,11 +37,10 @@ func TestCache_Database(t *testing.T) {
}
// returns a NodeSelectionConfig with sensible test values.
func testNodeSelectionConfig(newNodeFraction float64, distinctIP bool) overlay.NodeSelectionConfig {
func testNodeSelectionConfig(newNodeFraction float64) overlay.NodeSelectionConfig {
return overlay.NodeSelectionConfig{
NewNodeFraction: newNodeFraction,
OnlineWindow: time.Hour,
DistinctIP: distinctIP,
}
}
@ -63,7 +62,7 @@ func testCache(ctx *testcontext.Context, t *testing.T, store overlay.DB, nodeEve
address := &pb.NodeAddress{Address: "127.0.0.1:0"}
lastNet := "127.0.0"
nodeSelectionConfig := testNodeSelectionConfig(0, false)
nodeSelectionConfig := testNodeSelectionConfig(0)
serviceConfig := overlay.Config{
Node: nodeSelectionConfig,
NodeSelectionCache: overlay.UploadSelectionCacheConfig{
@ -283,7 +282,7 @@ func TestRandomizedSelectionCache(t *testing.T) {
for i := 0; i < totalNodes; i++ {
newID := testrand.NodeID()
address := fmt.Sprintf("127.0.%d.0:8080", i)
lastNet := fmt.Sprintf("127.0.%d", i)
lastNet := address
n := overlay.NodeCheckInInfo{
NodeID: newID,

View File

@ -96,7 +96,6 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage
selected, err := state.Select(ctx, uploadselection.Request{
Count: req.RequestedCount,
NewFraction: cache.selectionConfig.NewNodeFraction,
Distinct: cache.selectionConfig.DistinctIP,
ExcludedIDs: req.ExcludedIDs,
Placement: req.Placement,
ExcludedCountryCodes: cache.selectionConfig.UploadExcludedCountryCodes,

View File

@ -459,6 +459,12 @@ func TestGetNodesDistinct(t *testing.T) {
}
{ // test that distinctIP=true allows selecting 6 nodes
// emulate DistinctIP=false behavior by filling in LastNets with unique addresses
for _, nodeList := range [][]*overlay.SelectedNode{reputableNodes, newNodes} {
for i := range nodeList {
nodeList[i].LastNet = nodeList[i].LastIPPort
}
}
config := nodeSelectionConfig
config.NewNodeFraction = 0.5
config.DistinctIP = false

View File

@ -57,9 +57,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
needNewNodes--
receivedNewNodes++
if criteria.DistinctIP {
receivedNodeNetworks[node.LastNet] = struct{}{}
}
receivedNodeNetworks[node.LastNet] = struct{}{}
}
for _, node := range reputableNodes {
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
@ -72,9 +70,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
nodes = append(nodes, node)
needReputableNodes--
if criteria.DistinctIP {
receivedNodeNetworks[node.LastNet] = struct{}{}
}
receivedNodeNetworks[node.LastNet] = struct{}{}
}
// when we did not find new nodes, then return all as reputable
@ -106,32 +102,19 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
if !criteria.DistinctIP {
reputableNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`,
condition: reputableNodesCondition,
limit: reputableNodeCount,
}
newNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`,
condition: newNodesCondition,
limit: newNodeCount,
}
} else {
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
}
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
}
query := unionAll(newNodeQuery, reputableNodeQuery)
@ -212,15 +195,13 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria,
pgutil.NodeIDArray(excludedIDs),
)
}
if criteria.DistinctIP {
if len(excludedNetworks) > 0 {
conds.add(
`NOT (last_net = ANY(?::text[]))`,
pgutil.TextArray(excludedNetworks),
)
}
conds.add(`last_net <> ''`)
if len(excludedNetworks) > 0 {
conds.add(
`NOT (last_net = ANY(?::text[]))`,
pgutil.TextArray(excludedNetworks),
)
}
conds.add(`last_net <> ''`)
return conds.combine(), nil
}

View File

@ -164,6 +164,20 @@ sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" "$(storj-s
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" "$(storj-sim network env STORAGENODE_8_DIR)"/config.yaml
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" "$(storj-sim network env STORAGENODE_9_DIR)"/config.yaml
# For cases where the release predates changeset I0e7e92498c3da768df5b4d5fb213dcd2d4862924,
# adjust all last_net values for future compatibility. this migration step is only necessary for
# satellites which existed before the aforementioned changeset and use dev defaults (to be specific,
# DistinctIP is off). This is a harmless change for any other satellites using dev defaults.
if [ "${STORJ_SIM_POSTGRES#cockroach:}" != "$STORJ_SIM_POSTGRES" ]; then
schema_set=
pgurl="${STORJ_SIM_POSTGRES/cockroach:/postgres:}"
pgurl="${pgurl%?sslmode=disable}/satellite/0?sslmode=disable"
else
schema_set='set search_path to "satellite/0"; '
pgurl="$STORJ_SIM_POSTGRES"
fi
psql "$pgurl" -c "${schema_set}update nodes set last_net = last_ip_port"
# Run with 9 nodes to exercise more code paths with one node being offline.
STORJ_NUM_NODES=9