satellite/{nodeselection,overlay}: NodeFilters for dynamic placement implementations

Change-Id: Ica3a7b535fa6736cd8fb12066e615b70e1fa65d6
This commit is contained in:
Márton Elek 2023-06-30 12:13:18 +02:00 committed by Storj Robot
parent e3d2f09988
commit ddf1f1c340
13 changed files with 646 additions and 246 deletions

View File

@ -1,56 +0,0 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information
package uploadselection
import (
"storj.io/common/storj"
"storj.io/common/storj/location"
)
// Criteria to filter nodes.
type Criteria struct {
ExcludeNodeIDs []storj.NodeID
AutoExcludeSubnets map[string]struct{} // initialize it with empty map to keep only one node per subnet.
Placement storj.PlacementConstraint
ExcludedCountryCodes []location.CountryCode
}
// MatchInclude returns with true if node is selected.
func (c *Criteria) MatchInclude(node *SelectedNode) bool {
if ContainsID(c.ExcludeNodeIDs, node.ID) {
return false
}
if !c.Placement.AllowedCountry(node.CountryCode) {
return false
}
if c.AutoExcludeSubnets != nil {
if _, excluded := c.AutoExcludeSubnets[node.LastNet]; excluded {
return false
}
c.AutoExcludeSubnets[node.LastNet] = struct{}{}
}
for _, code := range c.ExcludedCountryCodes {
if code == location.None {
continue
}
if node.CountryCode == code {
return false
}
}
return true
}
// ContainsID returns whether ids contain id.
func ContainsID(ids []storj.NodeID, id storj.NodeID) bool {
for _, k := range ids {
if k == id {
return true
}
}
return false
}

View File

@ -1,130 +0,0 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information
package uploadselection
import (
"testing"
"github.com/stretchr/testify/assert"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testrand"
)
func TestCriteria_AutoExcludeSubnet(t *testing.T) {
criteria := Criteria{
AutoExcludeSubnets: map[string]struct{}{},
}
assert.True(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.0.1",
}))
assert.False(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.0.1",
}))
assert.True(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.1.1",
}))
}
func TestCriteria_ExcludeNodeID(t *testing.T) {
included := testrand.NodeID()
excluded := testrand.NodeID()
criteria := Criteria{
ExcludeNodeIDs: []storj.NodeID{excluded},
}
assert.False(t, criteria.MatchInclude(&SelectedNode{
ID: excluded,
}))
assert.True(t, criteria.MatchInclude(&SelectedNode{
ID: included,
}))
}
func TestCriteria_NodeIDAndSubnet(t *testing.T) {
excluded := testrand.NodeID()
criteria := Criteria{
ExcludeNodeIDs: []storj.NodeID{excluded},
AutoExcludeSubnets: map[string]struct{}{},
}
// due to node id criteria
assert.False(t, criteria.MatchInclude(&SelectedNode{
ID: excluded,
LastNet: "192.168.0.1",
}))
// should be included as previous one excluded and
// not stored for subnet exclusion
assert.True(t, criteria.MatchInclude(&SelectedNode{
ID: testrand.NodeID(),
LastNet: "192.168.0.2",
}))
}
func TestCriteria_Geofencing(t *testing.T) {
eu := Criteria{
Placement: storj.EU,
}
us := Criteria{
Placement: storj.US,
}
cases := []struct {
name string
country location.CountryCode
criteria Criteria
expected bool
}{
{
name: "US matches US selector",
country: location.UnitedStates,
criteria: us,
expected: true,
},
{
name: "Germany is EU",
country: location.Germany,
criteria: eu,
expected: true,
},
{
name: "US is not eu",
country: location.UnitedStates,
criteria: eu,
expected: false,
},
{
name: "Empty country doesn't match region",
country: location.CountryCode(0),
criteria: eu,
expected: false,
},
{
name: "Empty country doesn't match country",
country: location.CountryCode(0),
criteria: us,
expected: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assert.Equal(t, c.expected, c.criteria.MatchInclude(&SelectedNode{
CountryCode: c.country,
}))
})
}
}

View File

@ -0,0 +1,182 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package uploadselection
import (
"bytes"
"storj.io/common/storj"
"storj.io/common/storj/location"
)
// NodeFilter can decide if a Node should be part of the selection or not.
type NodeFilter interface {
MatchInclude(node *SelectedNode) bool
}
// NodeFilters is a collection of multiple node filters (all should vote with true).
type NodeFilters []NodeFilter
// NodeFilterFunc is helper to use func as NodeFilter.
type NodeFilterFunc func(node *SelectedNode) bool
// ExcludeAll will never select any node.
var ExcludeAll = NodeFilters{
NodeFilterFunc(func(node *SelectedNode) bool {
return false
}),
}
// MatchInclude implements NodeFilter interface.
func (n NodeFilterFunc) MatchInclude(node *SelectedNode) bool {
return n(node)
}
// MatchInclude implements NodeFilter interface.
func (n NodeFilters) MatchInclude(node *SelectedNode) bool {
for _, filter := range n {
if !filter.MatchInclude(node) {
return false
}
}
return true
}
// WithCountryFilter is a helper to create a new filter with additional CountryFilter.
func (n NodeFilters) WithCountryFilter(filter func(code location.CountryCode) bool) NodeFilters {
return append(n, CountryFilter{
matchIncludeCountry: filter,
})
}
// WithAutoExcludeSubnets is a helper to create a new filter with additional AutoExcludeSubnets.
func (n NodeFilters) WithAutoExcludeSubnets() NodeFilters {
return append(n, NewAutoExcludeSubnets())
}
// WithExcludedIDs is a helper to create a new filter with additional WithExcludedIDs.
func (n NodeFilters) WithExcludedIDs(ds []storj.NodeID) NodeFilters {
return append(n, ExcludedIDs(ds))
}
var _ NodeFilter = NodeFilters{}
// CountryCodeExclude is a specific CountryFilter which excludes all nodes with the given country code.
type CountryCodeExclude []location.CountryCode
// MatchInclude implements NodeFilter interface.
func (c CountryCodeExclude) MatchInclude(node *SelectedNode) bool {
for _, code := range c {
if code == location.None {
continue
}
if node.CountryCode == code {
return false
}
}
return true
}
var _ NodeFilter = CountryCodeExclude{}
// CountryFilter can select nodes based on the condition of the country code.
type CountryFilter struct {
matchIncludeCountry func(code location.CountryCode) bool
}
// NewCountryFilter creates a new CountryFilter.
func NewCountryFilter(filter func(code location.CountryCode) bool) NodeFilter {
return CountryFilter{
matchIncludeCountry: filter,
}
}
// MatchInclude implements NodeFilter interface.
func (p CountryFilter) MatchInclude(node *SelectedNode) bool {
return p.matchIncludeCountry(node.CountryCode)
}
var _ NodeFilter = CountryFilter{}
// AutoExcludeSubnets pick at most one node from network.
// Stateful!!! should be re-created for each new selection request.
type AutoExcludeSubnets struct {
seenSubnets map[string]struct{}
}
// NewAutoExcludeSubnets creates an initialized AutoExcludeSubnets.
func NewAutoExcludeSubnets() *AutoExcludeSubnets {
return &AutoExcludeSubnets{
seenSubnets: map[string]struct{}{},
}
}
// MatchInclude implements NodeFilter interface.
func (a *AutoExcludeSubnets) MatchInclude(node *SelectedNode) bool {
if _, found := a.seenSubnets[node.LastNet]; found {
return false
}
a.seenSubnets[node.LastNet] = struct{}{}
return true
}
var _ NodeFilter = &AutoExcludeSubnets{}
// ExcludedNetworks will exclude nodes with specified networks.
type ExcludedNetworks []string
// MatchInclude implements NodeFilter interface.
func (e ExcludedNetworks) MatchInclude(node *SelectedNode) bool {
for _, id := range e {
if id == node.LastNet {
return false
}
}
return true
}
var _ NodeFilter = ExcludedNetworks{}
// ExcludedIDs can blacklist NodeIDs.
type ExcludedIDs []storj.NodeID
// MatchInclude implements NodeFilter interface.
func (e ExcludedIDs) MatchInclude(node *SelectedNode) bool {
for _, id := range e {
if id == node.ID {
return false
}
}
return true
}
var _ NodeFilter = ExcludedIDs{}
// TagFilter matches nodes with specific tags.
type TagFilter struct {
signer storj.NodeID
name string
value []byte
}
// NewTagFilter creates a new tag filter.
func NewTagFilter(id storj.NodeID, name string, value []byte) TagFilter {
return TagFilter{
signer: id,
name: name,
value: value,
}
}
// MatchInclude implements NodeFilter interface.
func (t TagFilter) MatchInclude(node *SelectedNode) bool {
for _, tag := range node.Tags {
if tag.Name == t.name && bytes.Equal(tag.Value, t.value) && tag.Signer == t.signer {
return true
}
}
return false
}
var _ NodeFilter = TagFilter{}

View File

@ -0,0 +1,185 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package uploadselection
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext"
"storj.io/common/testrand"
)
func TestNodeFilter_AutoExcludeSubnet(t *testing.T) {
criteria := NodeFilters{}.WithAutoExcludeSubnets()
assert.True(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.0.1",
}))
assert.False(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.0.1",
}))
assert.True(t, criteria.MatchInclude(&SelectedNode{
LastNet: "192.168.1.1",
}))
}
func TestCriteria_ExcludeNodeID(t *testing.T) {
included := testrand.NodeID()
excluded := testrand.NodeID()
criteria := NodeFilters{}.WithExcludedIDs([]storj.NodeID{excluded})
assert.False(t, criteria.MatchInclude(&SelectedNode{
ID: excluded,
}))
assert.True(t, criteria.MatchInclude(&SelectedNode{
ID: included,
}))
}
func TestCriteria_NodeIDAndSubnet(t *testing.T) {
excluded := testrand.NodeID()
criteria := NodeFilters{}.
WithExcludedIDs([]storj.NodeID{excluded}).
WithAutoExcludeSubnets()
// due to node id criteria
assert.False(t, criteria.MatchInclude(&SelectedNode{
ID: excluded,
LastNet: "192.168.0.1",
}))
// should be included as previous one excluded and
// not stored for subnet exclusion
assert.True(t, criteria.MatchInclude(&SelectedNode{
ID: testrand.NodeID(),
LastNet: "192.168.0.2",
}))
}
func TestCriteria_Geofencing(t *testing.T) {
eu := NodeFilters{}.WithCountryFilter(func(code location.CountryCode) bool {
for _, c := range location.EuCountries {
if c == code {
return true
}
}
return false
})
us := NodeFilters{}.WithCountryFilter(func(code location.CountryCode) bool {
return code == location.UnitedStates
})
cases := []struct {
name string
country location.CountryCode
criteria NodeFilters
expected bool
}{
{
name: "US matches US selector",
country: location.UnitedStates,
criteria: us,
expected: true,
},
{
name: "Germany is EU",
country: location.Germany,
criteria: eu,
expected: true,
},
{
name: "US is not eu",
country: location.UnitedStates,
criteria: eu,
expected: false,
},
{
name: "Empty country doesn't match region",
country: location.CountryCode(0),
criteria: eu,
expected: false,
},
{
name: "Empty country doesn't match country",
country: location.CountryCode(0),
criteria: us,
expected: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assert.Equal(t, c.expected, c.criteria.MatchInclude(&SelectedNode{
CountryCode: c.country,
}))
})
}
}
// BenchmarkNodeFilterFullTable checks performances of rule evaluation on ALL storage nodes.
func BenchmarkNodeFilterFullTable(b *testing.B) {
filters := NodeFilters{}
filters = append(filters, NodeFilterFunc(func(node *SelectedNode) bool {
return true
}))
filters = append(filters, NodeFilterFunc(func(node *SelectedNode) bool {
return true
}))
filters = append(filters, NodeFilterFunc(func(node *SelectedNode) bool {
return true
}))
filters = filters.WithAutoExcludeSubnets()
benchmarkFilter(b, filters)
}
func benchmarkFilter(b *testing.B, filters NodeFilters) {
nodeNo := 25000
if testing.Short() {
nodeNo = 20
}
nodes := generatedSelectedNodes(b, nodeNo)
b.ResetTimer()
c := 0
for j := 0; j < b.N; j++ {
for n := 0; n < len(nodes); n++ {
if filters.MatchInclude(nodes[n]) {
c++
}
}
}
}
func generatedSelectedNodes(b *testing.B, nodeNo int) []*SelectedNode {
nodes := make([]*SelectedNode, nodeNo)
ctx := testcontext.New(b)
for i := 0; i < nodeNo; i++ {
node := SelectedNode{}
identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(b, err)
node.ID = identity.ID
node.LastNet = fmt.Sprintf("192.168.%d.0", i%256)
node.LastIPPort = fmt.Sprintf("192.168.%d.%d:%d", i%256, i%65536, i%1000+1000)
node.CountryCode = []location.CountryCode{location.None, location.UnitedStates, location.Germany, location.Hungary, location.Austria}[i%5]
nodes[i] = &node
}
return nodes
}

View File

@ -42,16 +42,28 @@ type SelectedNode struct {
LastNet string
LastIPPort string
CountryCode location.CountryCode
Tags NodeTags
}
// Clone returns a deep clone of the selected node.
func (node *SelectedNode) Clone() *SelectedNode {
copy := pb.CopyNode(&pb.Node{Id: node.ID, Address: node.Address})
tags := make([]NodeTag, len(node.Tags))
for ix, tag := range node.Tags {
tags[ix] = NodeTag{
NodeID: tag.NodeID,
SignedAt: tag.SignedAt,
Signer: tag.Signer,
Name: tag.Name,
Value: tag.Value,
}
}
return &SelectedNode{
ID: copy.Id,
Address: copy.Address,
LastNet: node.LastNet,
LastIPPort: node.LastIPPort,
CountryCode: node.CountryCode,
Tags: tags,
}
}

View File

@ -16,7 +16,7 @@ var _ Selector = (SelectByID)(nil)
func (nodes SelectByID) Count() int { return len(nodes) }
// Select selects upto n nodes.
func (nodes SelectByID) Select(n int, criteria Criteria) []*SelectedNode {
func (nodes SelectByID) Select(n int, nodeFilter NodeFilter) []*SelectedNode {
if n <= 0 {
return nil
}
@ -25,7 +25,7 @@ func (nodes SelectByID) Select(n int, criteria Criteria) []*SelectedNode {
for _, idx := range mathrand.Perm(len(nodes)) {
node := nodes[idx]
if !criteria.MatchInclude(node) {
if !nodeFilter.MatchInclude(node) {
continue
}
@ -71,7 +71,7 @@ func SelectBySubnetFromNodes(nodes []*SelectedNode) SelectBySubnet {
func (subnets SelectBySubnet) Count() int { return len(subnets) }
// Select selects upto n nodes.
func (subnets SelectBySubnet) Select(n int, criteria Criteria) []*SelectedNode {
func (subnets SelectBySubnet) Select(n int, filter NodeFilter) []*SelectedNode {
if n <= 0 {
return nil
}
@ -81,7 +81,7 @@ func (subnets SelectBySubnet) Select(n int, criteria Criteria) []*SelectedNode {
subnet := subnets[idx]
node := subnet.Nodes[mathrand.Intn(len(subnet.Nodes))]
if !criteria.MatchInclude(node) {
if !filter.MatchInclude(node) {
continue
}

View File

@ -54,7 +54,7 @@ func TestSelectByID(t *testing.T) {
// perform many node selections that selects 2 nodes
for i := 0; i < executionCount; i++ {
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
selectedNodes := selector.Select(reqCount, uploadselection.NodeFilters{})
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
selectedNodeCount[node.ID]++
@ -114,7 +114,7 @@ func TestSelectBySubnet(t *testing.T) {
// perform many node selections that selects 2 nodes
for i := 0; i < executionCount; i++ {
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
selectedNodes := selector.Select(reqCount, uploadselection.NodeFilters{})
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
selectedNodeCount[node.ID]++
@ -186,7 +186,7 @@ func TestSelectBySubnetOneAtATime(t *testing.T) {
// perform many node selections that selects 1 node
for i := 0; i < executionCount; i++ {
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
selectedNodes := selector.Select(reqCount, uploadselection.NodeFilters{})
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
selectedNodeCount[node.ID]++
@ -244,16 +244,11 @@ func TestSelectFiltered(t *testing.T) {
nodes := []*uploadselection.SelectedNode{subnetA1, subnetA2, subnetB1}
selector := uploadselection.SelectByID(nodes)
assert.Len(t, selector.Select(3, uploadselection.Criteria{}), 3)
assert.Len(t, selector.Select(3, uploadselection.Criteria{ExcludeNodeIDs: []storj.NodeID{firstID}}), 2)
assert.Len(t, selector.Select(3, uploadselection.Criteria{}), 3)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}), 3)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}.WithAutoExcludeSubnets()), 2)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}), 3)
assert.Len(t, selector.Select(3, uploadselection.Criteria{ExcludeNodeIDs: []storj.NodeID{firstID, secondID}}), 1)
assert.Len(t, selector.Select(3, uploadselection.Criteria{
AutoExcludeSubnets: map[string]struct{}{},
}), 2)
assert.Len(t, selector.Select(3, uploadselection.Criteria{
ExcludeNodeIDs: []storj.NodeID{thirdID},
AutoExcludeSubnets: map[string]struct{}{},
}), 1)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}.WithExcludedIDs([]storj.NodeID{firstID, secondID})), 1)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}.WithAutoExcludeSubnets()), 2)
assert.Len(t, selector.Select(3, uploadselection.NodeFilters{}.WithExcludedIDs([]storj.NodeID{thirdID}).WithAutoExcludeSubnets()), 1)
}

View File

@ -10,7 +10,6 @@ import (
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/storj/location"
)
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters.
@ -42,7 +41,7 @@ type Selector interface {
Count() int
// Select selects up-to n nodes which are included by the criteria.
// empty criteria includes all the nodes
Select(n int, criteria Criteria) []*SelectedNode
Select(n int, nodeFilter NodeFilter) []*SelectedNode
}
// NewState returns a state based on the input.
@ -70,11 +69,9 @@ func NewState(reputableNodes, newNodes []*SelectedNode) *State {
// Request contains arguments for State.Request.
type Request struct {
Count int
NewFraction float64
ExcludedIDs []storj.NodeID
Placement storj.PlacementConstraint
ExcludedCountryCodes []string
Count int
NewFraction float64
NodeFilters NodeFilters
}
// Select selects requestedCount nodes where there will be newFraction nodes.
@ -92,36 +89,18 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*SelectedN
var reputableNodes Selector
var newNodes Selector
var criteria Criteria
if request.ExcludedIDs != nil {
criteria.ExcludeNodeIDs = request.ExcludedIDs
}
for _, code := range request.ExcludedCountryCodes {
criteria.ExcludedCountryCodes = append(criteria.ExcludedCountryCodes, location.ToCountryCode(code))
}
criteria.Placement = request.Placement
criteria.AutoExcludeSubnets = make(map[string]struct{})
for _, id := range request.ExcludedIDs {
if net, ok := state.netByID[id]; ok {
criteria.AutoExcludeSubnets[net] = struct{}{}
}
}
reputableNodes = state.distinct.Reputable
newNodes = state.distinct.New
// Get a random selection of new nodes out of the cache first so that if there aren't
// enough new nodes on the network, we can fall back to using reputable nodes instead.
selected = append(selected,
newNodes.Select(newCount, criteria)...)
newNodes.Select(newCount, request.NodeFilters)...)
// Get all the remaining reputable nodes.
reputableCount := totalCount - len(selected)
selected = append(selected,
reputableNodes.Select(reputableCount, criteria)...)
reputableNodes.Select(reputableCount, request.NodeFilters)...)
if len(selected) < totalCount {
return selected, ErrNotEnoughNodes.New("requested from cache %d, found %d", totalCount, len(selected))
@ -136,3 +115,19 @@ func (state *State) Stats() Stats {
return state.stats
}
// ExcludeNetworksBasedOnNodes will create a NodeFilter which exclude all nodes which shares subnet with the specified ones.
func (state *State) ExcludeNetworksBasedOnNodes(ds []storj.NodeID) NodeFilter {
uniqueExcludedNet := make(map[string]struct{}, len(ds))
for _, id := range ds {
net := state.netByID[id]
uniqueExcludedNet[net] = struct{}{}
}
excludedNet := make([]string, len(uniqueExcludedNet))
i := 0
for net := range uniqueExcludedNet {
excludedNet[i] = net
i++
}
return ExcludedNetworks(excludedNet)
}

View File

@ -39,7 +39,6 @@ func TestState_SelectNonDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
@ -51,7 +50,6 @@ func TestState_SelectNonDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
@ -65,7 +63,6 @@ func TestState_SelectNonDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
@ -98,7 +95,6 @@ func TestState_SelectDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
@ -109,7 +105,6 @@ func TestState_SelectDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0,
ExcludedIDs: nil,
})
require.Error(t, err)
require.Len(t, selected, 2)
@ -121,7 +116,6 @@ func TestState_SelectDistinct(t *testing.T) {
selected, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: newFraction,
ExcludedIDs: nil,
})
require.NoError(t, err)
require.Len(t, selected, selectCount)
@ -151,7 +145,6 @@ func TestState_Select_Concurrent(t *testing.T) {
nodes, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0.5,
ExcludedIDs: nil,
})
require.Len(t, nodes, selectCount)
return err
@ -162,7 +155,6 @@ func TestState_Select_Concurrent(t *testing.T) {
nodes, err := state.Select(ctx, uploadselection.Request{
Count: selectCount,
NewFraction: 0.5,
ExcludedIDs: nil,
})
require.Len(t, nodes, selectCount)
return err

View File

@ -0,0 +1,85 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/storj/satellite/nodeselection/uploadselection"
)
// PlacementRules can crate filter based on the placement identifier.
type PlacementRules func(constraint storj.PlacementConstraint) (filter uploadselection.NodeFilters)
// ConfigurablePlacementRule can include the placement definitions for each known identifier.
type ConfigurablePlacementRule struct {
placements map[storj.PlacementConstraint]uploadselection.NodeFilters
}
// NewPlacementRules creates a fully initialized NewPlacementRules.
func NewPlacementRules() *ConfigurablePlacementRule {
return &ConfigurablePlacementRule{
placements: map[storj.PlacementConstraint]uploadselection.NodeFilters{},
}
}
// AddLegacyStaticRules initializes all the placement rules defined earlier in static golang code.
func (d *ConfigurablePlacementRule) AddLegacyStaticRules() {
d.placements[storj.EEA] = uploadselection.NodeFilters{}.WithCountryFilter(func(isoCountryCode location.CountryCode) bool {
for _, c := range location.EeaNonEuCountries {
if c == isoCountryCode {
return true
}
}
for _, c := range location.EuCountries {
if c == isoCountryCode {
return true
}
}
return false
})
d.placements[storj.EU] = uploadselection.NodeFilters{}.WithCountryFilter(func(isoCountryCode location.CountryCode) bool {
for _, c := range location.EuCountries {
if c == isoCountryCode {
return true
}
}
return false
})
d.placements[storj.US] = uploadselection.NodeFilters{}.WithCountryFilter(func(isoCountryCode location.CountryCode) bool {
return isoCountryCode == location.UnitedStates
})
d.placements[storj.DE] = uploadselection.NodeFilters{}.WithCountryFilter(func(isoCountryCode location.CountryCode) bool {
return isoCountryCode == location.Germany
})
d.placements[storj.NR] = uploadselection.NodeFilters{}.WithCountryFilter(func(isoCountryCode location.CountryCode) bool {
return isoCountryCode != location.Russia && isoCountryCode != location.Belarus
})
}
// AddPlacementRule registers a new placement.
func (d *ConfigurablePlacementRule) AddPlacementRule(id storj.PlacementConstraint, filters uploadselection.NodeFilters) {
d.placements[id] = filters
}
// CreateFilters implements PlacementCondition.
func (d *ConfigurablePlacementRule) CreateFilters(constraint storj.PlacementConstraint) (filter uploadselection.NodeFilters) {
if constraint == 0 {
return uploadselection.NodeFilters{}
}
if filters, found := d.placements[constraint]; found {
return filters
}
return uploadselection.ExcludeAll
}
// CreateDefaultPlacementRules returns with a default set of configured placement rules.
func CreateDefaultPlacementRules(satelliteID storj.NodeID) PlacementRules {
placement := NewPlacementRules()
placement.AddLegacyStaticRules()
placement.AddPlacementRule(10, uploadselection.NodeFilters{
uploadselection.NewTagFilter(satelliteID, "selection", []byte("true")),
})
return placement.CreateFilters
}

View File

@ -324,8 +324,25 @@ func NewService(log *zap.Logger, db DB, nodeEvents nodeevents.DB, satelliteAddr,
}
}
defaultSelection := uploadselection.NodeFilters{}
if len(config.Node.UploadExcludedCountryCodes) > 0 {
defaultSelection = defaultSelection.WithCountryFilter(func(code location.CountryCode) bool {
for _, nodeCountry := range config.Node.UploadExcludedCountryCodes {
if nodeCountry == code.String() {
return false
}
}
return true
})
}
// TODO: this supposed to be configurable
placementRules := NewPlacementRules()
placementRules.AddLegacyStaticRules()
uploadSelectionCache, err := NewUploadSelectionCache(log, db,
config.NodeSelectionCache.Staleness, config.Node,
defaultSelection, placementRules.CreateFilters,
)
if err != nil {
return nil, errs.Wrap(err)

View File

@ -36,14 +36,19 @@ type UploadSelectionCache struct {
selectionConfig NodeSelectionConfig
cache sync2.ReadCacheOf[*uploadselection.State]
defaultFilters uploadselection.NodeFilters
placementRules PlacementRules
}
// NewUploadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data.
func NewUploadSelectionCache(log *zap.Logger, db UploadSelectionDB, staleness time.Duration, config NodeSelectionConfig) (*UploadSelectionCache, error) {
func NewUploadSelectionCache(log *zap.Logger, db UploadSelectionDB, staleness time.Duration, config NodeSelectionConfig, defaultFilter uploadselection.NodeFilters, placementRules PlacementRules) (*UploadSelectionCache, error) {
cache := &UploadSelectionCache{
log: log,
db: db,
selectionConfig: config,
defaultFilters: defaultFilter,
placementRules: placementRules,
}
return cache, cache.cache.Init(staleness/2, staleness, cache.read)
}
@ -91,17 +96,22 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage
return nil, Error.Wrap(err)
}
filters := cache.placementRules(req.Placement)
if len(req.ExcludedIDs) > 0 {
filters = append(filters, state.ExcludeNetworksBasedOnNodes(req.ExcludedIDs))
}
filters = append(filters, cache.defaultFilters)
filters = filters.WithAutoExcludeSubnets()
selected, err := state.Select(ctx, uploadselection.Request{
Count: req.RequestedCount,
NewFraction: cache.selectionConfig.NewNodeFraction,
ExcludedIDs: req.ExcludedIDs,
Placement: req.Placement,
ExcludedCountryCodes: cache.selectionConfig.UploadExcludedCountryCodes,
Count: req.RequestedCount,
NewFraction: cache.selectionConfig.NewNodeFraction,
NodeFilters: filters,
})
if uploadselection.ErrNotEnoughNodes.Has(err) {
err = ErrNotEnoughNodes.Wrap(err)
}
return selected, err
}

View File

@ -5,7 +5,10 @@ package overlay_test
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -14,9 +17,11 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
@ -58,6 +63,8 @@ func TestRefresh(t *testing.T) {
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -160,6 +167,8 @@ func TestRefreshConcurrent(t *testing.T) {
&mockDB,
highStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -185,6 +194,8 @@ func TestRefreshConcurrent(t *testing.T) {
&mockDB,
lowStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
ctx.Go(func() error { return cache.Run(cacheCtx) })
@ -213,6 +224,8 @@ func TestGetNodes(t *testing.T) {
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -299,6 +312,8 @@ func TestGetNodesConcurrent(t *testing.T) {
&mockDB,
highStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -344,6 +359,8 @@ func TestGetNodesConcurrent(t *testing.T) {
&mockDB,
lowStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -434,6 +451,8 @@ func TestGetNodesDistinct(t *testing.T) {
&mockDB,
highStaleness,
config,
uploadselection.NodeFilters{}.WithAutoExcludeSubnets(),
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -473,6 +492,8 @@ func TestGetNodesDistinct(t *testing.T) {
&mockDB,
highStaleness,
config,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -496,6 +517,8 @@ func TestGetNodesError(t *testing.T) {
&mockDB,
highStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -529,6 +552,8 @@ func TestNewNodeFraction(t *testing.T) {
db.OverlayCache(),
lowStaleness,
nodeSelectionConfig,
uploadselection.NodeFilters{},
overlay.NewPlacementRules().CreateFilters,
)
require.NoError(t, err)
@ -571,3 +596,91 @@ func TestNewNodeFraction(t *testing.T) {
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction)) // 1, 1
})
}
func BenchmarkGetNodes(b *testing.B) {
newNodes := 2000
oldNodes := 18000
required := 110
if testing.Short() {
newNodes = 10
oldNodes = 50
required = 2
}
ctx, cancel := context.WithCancel(testcontext.New(b))
defer cancel()
log, err := zap.NewDevelopment()
require.NoError(b, err)
placement := overlay.NewPlacementRules()
placement.AddLegacyStaticRules()
defaultFilter := uploadselection.NodeFilters{}
db := NewMockUploadSelectionDb(
generatedSelectedNodes(b, oldNodes),
generatedSelectedNodes(b, newNodes),
)
cache, err := overlay.NewUploadSelectionCache(log, db, 10*time.Minute, overlay.NodeSelectionConfig{
NewNodeFraction: 0.1,
}, defaultFilter, placement.CreateFilters)
require.NoError(b, err)
go func() {
_ = cache.Run(ctx)
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: required,
Placement: storj.US,
})
require.NoError(b, err)
}
}
// MockUploadSelection implements overlay.UploadSelectionDB with a static list.
type MockUploadSelectionDB struct {
new []*uploadselection.SelectedNode
reputable []*uploadselection.SelectedNode
}
// NewMockUploadSelectionDb creates a MockUploadSelectionDB with the given reputable and new nodes.
func NewMockUploadSelectionDb(reputable, new []*uploadselection.SelectedNode) *MockUploadSelectionDB {
return &MockUploadSelectionDB{
new: new,
reputable: reputable,
}
}
// SelectAllStorageNodesUpload implements overlay.UploadSelectionDB.
func (m MockUploadSelectionDB) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*uploadselection.SelectedNode, err error) {
return m.reputable, m.new, nil
}
var _ overlay.UploadSelectionDB = &MockUploadSelectionDB{}
func generatedSelectedNodes(b *testing.B, nodeNo int) []*uploadselection.SelectedNode {
nodes := make([]*uploadselection.SelectedNode, nodeNo)
ctx := testcontext.New(b)
for i := 0; i < nodeNo; i++ {
node := uploadselection.SelectedNode{}
identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(b, err)
node.ID = identity.ID
// with 5% percentage chance, we re-use an existing IP address.
if rand.Intn(100) < 5 && i > 0 {
prevParts := strings.Split(nodes[rand.Intn(i)].LastIPPort, ":")
node.LastIPPort = fmt.Sprintf("%s:%d", prevParts[0], rand.Int31n(10000)+1000)
} else {
node.LastIPPort = fmt.Sprintf("%d.%d.%d.%d:%d", 10+i/256/256%256, i/256%256, i%256, 1, rand.Int31n(10000)+1000)
}
parts := strings.Split(node.LastIPPort, ".")
node.LastNet = fmt.Sprintf("%s.%s.%s.0", parts[0], parts[1], parts[2])
node.CountryCode = []location.CountryCode{location.None, location.UnitedStates, location.Germany, location.Hungary, location.Austria}[i%5]
nodes[i] = &node
}
return nodes
}