satellite/nodeselection: flexible interface to includes nodes in selection
This commit doesn't change any behavior, just organize the code in different way to make it easier to implement different Criterias to include nodes. Today we use NodeID and Subnet based selection but later Criteria can be extended with different kind of placement rules (like geofencing). The change nodeselection is used by segment allocaton (upload) and repair and excludes nodes from an in-memory selection. Resolves https://github.com/storj/storj/issues/4240 Change-Id: I0c1955fe16a045e3b76d7e50b2e1f4575a7ff095
This commit is contained in:
parent
de38e2e7d8
commit
20d03bebdb
@ -1,4 +1,4 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
// Package uploadselection implements node selection logic for uploads.
|
// Package uploadselection implements node selection logic for uploads.
|
||||||
|
36
satellite/nodeselection/uploadselection/criteria.go
Normal file
36
satellite/nodeselection/uploadselection/criteria.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
// Copyright (C) 2021 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information
|
||||||
|
|
||||||
|
package uploadselection
|
||||||
|
|
||||||
|
import "storj.io/common/storj"
|
||||||
|
|
||||||
|
// Criteria to filter nodes.
|
||||||
|
type Criteria struct {
|
||||||
|
ExcludeNodeIDs []storj.NodeID
|
||||||
|
AutoExcludeSubnets map[string]struct{} // initialize it with empty map to keep only one node per subnet.
|
||||||
|
}
|
||||||
|
|
||||||
|
// MatchInclude returns with true if node is selected.
|
||||||
|
func (c *Criteria) MatchInclude(node *Node) bool {
|
||||||
|
if ContainsID(c.ExcludeNodeIDs, node.ID) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if c.AutoExcludeSubnets != nil {
|
||||||
|
if _, excluded := c.AutoExcludeSubnets[node.LastNet]; excluded {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
c.AutoExcludeSubnets[node.LastNet] = struct{}{}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainsID returns whether ids contain id.
|
||||||
|
func ContainsID(ids []storj.NodeID, id storj.NodeID) bool {
|
||||||
|
for _, k := range ids {
|
||||||
|
if k == id {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
83
satellite/nodeselection/uploadselection/criteria_test.go
Normal file
83
satellite/nodeselection/uploadselection/criteria_test.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright (C) 2021 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information
|
||||||
|
|
||||||
|
package uploadselection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/common/testrand"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCriteria_AutoExcludeSubnet(t *testing.T) {
|
||||||
|
|
||||||
|
criteria := Criteria{
|
||||||
|
AutoExcludeSubnets: map[string]struct{}{},
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, criteria.MatchInclude(&Node{
|
||||||
|
LastNet: "192.168.0.1",
|
||||||
|
}))
|
||||||
|
|
||||||
|
assert.False(t, criteria.MatchInclude(&Node{
|
||||||
|
LastNet: "192.168.0.1",
|
||||||
|
}))
|
||||||
|
|
||||||
|
assert.True(t, criteria.MatchInclude(&Node{
|
||||||
|
LastNet: "192.168.1.1",
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCriteria_ExcludeNodeID(t *testing.T) {
|
||||||
|
included := testrand.NodeID()
|
||||||
|
excluded := testrand.NodeID()
|
||||||
|
|
||||||
|
criteria := Criteria{
|
||||||
|
ExcludeNodeIDs: []storj.NodeID{excluded},
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.False(t, criteria.MatchInclude(&Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: excluded,
|
||||||
|
Address: "localhost",
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
assert.True(t, criteria.MatchInclude(&Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: included,
|
||||||
|
Address: "localhost",
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCriteria_NodeIDAndSubnet(t *testing.T) {
|
||||||
|
excluded := testrand.NodeID()
|
||||||
|
|
||||||
|
criteria := Criteria{
|
||||||
|
ExcludeNodeIDs: []storj.NodeID{excluded},
|
||||||
|
AutoExcludeSubnets: map[string]struct{}{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// due to node id criteria
|
||||||
|
assert.False(t, criteria.MatchInclude(&Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: excluded,
|
||||||
|
Address: "192.168.0.1",
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
// should be included as previous one excluded and
|
||||||
|
// not stored for subnet exclusion
|
||||||
|
assert.True(t, criteria.MatchInclude(&Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: testrand.NodeID(),
|
||||||
|
Address: "192.168.0.2",
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package uploadselection
|
package uploadselection
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package uploadselection
|
package uploadselection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
mathrand "math/rand" // Using mathrand here because crypto-graphic randomness is not required and simplifies code.
|
mathrand "math/rand" // Using mathrand here because crypto-graphic randomness is not required and simplifies code.
|
||||||
|
|
||||||
"storj.io/common/storj"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SelectByID implements selection from nodes with every node having equal probability.
|
// SelectByID implements selection from nodes with every node having equal probability.
|
||||||
@ -18,7 +16,7 @@ var _ Selector = (SelectByID)(nil)
|
|||||||
func (nodes SelectByID) Count() int { return len(nodes) }
|
func (nodes SelectByID) Count() int { return len(nodes) }
|
||||||
|
|
||||||
// Select selects upto n nodes.
|
// Select selects upto n nodes.
|
||||||
func (nodes SelectByID) Select(n int, excludedIDs []storj.NodeID, excludedNets map[string]struct{}) []*Node {
|
func (nodes SelectByID) Select(n int, criteria Criteria) []*Node {
|
||||||
if n <= 0 {
|
if n <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -27,15 +25,9 @@ func (nodes SelectByID) Select(n int, excludedIDs []storj.NodeID, excludedNets m
|
|||||||
for _, idx := range mathrand.Perm(len(nodes)) {
|
for _, idx := range mathrand.Perm(len(nodes)) {
|
||||||
node := nodes[idx]
|
node := nodes[idx]
|
||||||
|
|
||||||
if ContainsID(excludedIDs, node.ID) {
|
if !criteria.MatchInclude(node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if excludedNets != nil {
|
|
||||||
if _, excluded := excludedNets[node.LastNet]; excluded {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludedNets[node.LastNet] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
selected = append(selected, node.Clone())
|
selected = append(selected, node.Clone())
|
||||||
if len(selected) >= n {
|
if len(selected) >= n {
|
||||||
@ -79,7 +71,7 @@ func SelectBySubnetFromNodes(nodes []*Node) SelectBySubnet {
|
|||||||
func (subnets SelectBySubnet) Count() int { return len(subnets) }
|
func (subnets SelectBySubnet) Count() int { return len(subnets) }
|
||||||
|
|
||||||
// Select selects upto n nodes.
|
// Select selects upto n nodes.
|
||||||
func (subnets SelectBySubnet) Select(n int, excludedIDs []storj.NodeID, excludedNets map[string]struct{}) []*Node {
|
func (subnets SelectBySubnet) Select(n int, criteria Criteria) []*Node {
|
||||||
if n <= 0 {
|
if n <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -89,15 +81,9 @@ func (subnets SelectBySubnet) Select(n int, excludedIDs []storj.NodeID, excluded
|
|||||||
subnet := subnets[idx]
|
subnet := subnets[idx]
|
||||||
node := subnet.Nodes[mathrand.Intn(len(subnet.Nodes))]
|
node := subnet.Nodes[mathrand.Intn(len(subnet.Nodes))]
|
||||||
|
|
||||||
if ContainsID(excludedIDs, node.ID) {
|
if !criteria.MatchInclude(node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if excludedNets != nil {
|
|
||||||
if _, excluded := excludedNets[node.LastNet]; excluded {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludedNets[node.LastNet] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
selected = append(selected, node.Clone())
|
selected = append(selected, node.Clone())
|
||||||
if len(selected) >= n {
|
if len(selected) >= n {
|
||||||
@ -107,13 +93,3 @@ func (subnets SelectBySubnet) Select(n int, excludedIDs []storj.NodeID, excluded
|
|||||||
|
|
||||||
return selected
|
return selected
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainsID returns whether ids contains id.
|
|
||||||
func ContainsID(ids []storj.NodeID, id storj.NodeID) bool {
|
|
||||||
for _, k := range ids {
|
|
||||||
if k == id {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package uploadselection_test
|
package uploadselection_test
|
||||||
@ -63,7 +63,7 @@ func TestSelectByID(t *testing.T) {
|
|||||||
|
|
||||||
// perform many node selections that selects 2 nodes
|
// perform many node selections that selects 2 nodes
|
||||||
for i := 0; i < executionCount; i++ {
|
for i := 0; i < executionCount; i++ {
|
||||||
selectedNodes := selector.Select(reqCount, nil, nil)
|
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
|
||||||
require.Len(t, selectedNodes, reqCount)
|
require.Len(t, selectedNodes, reqCount)
|
||||||
for _, node := range selectedNodes {
|
for _, node := range selectedNodes {
|
||||||
selectedNodeCount[node.ID]++
|
selectedNodeCount[node.ID]++
|
||||||
@ -132,7 +132,7 @@ func TestSelectBySubnet(t *testing.T) {
|
|||||||
|
|
||||||
// perform many node selections that selects 2 nodes
|
// perform many node selections that selects 2 nodes
|
||||||
for i := 0; i < executionCount; i++ {
|
for i := 0; i < executionCount; i++ {
|
||||||
selectedNodes := selector.Select(reqCount, nil, map[string]struct{}{})
|
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
|
||||||
require.Len(t, selectedNodes, reqCount)
|
require.Len(t, selectedNodes, reqCount)
|
||||||
for _, node := range selectedNodes {
|
for _, node := range selectedNodes {
|
||||||
selectedNodeCount[node.ID]++
|
selectedNodeCount[node.ID]++
|
||||||
@ -213,7 +213,7 @@ func TestSelectBySubnetOneAtATime(t *testing.T) {
|
|||||||
|
|
||||||
// perform many node selections that selects 1 node
|
// perform many node selections that selects 1 node
|
||||||
for i := 0; i < executionCount; i++ {
|
for i := 0; i < executionCount; i++ {
|
||||||
selectedNodes := selector.Select(reqCount, nil, map[string]struct{}{})
|
selectedNodes := selector.Select(reqCount, uploadselection.Criteria{})
|
||||||
require.Len(t, selectedNodes, reqCount)
|
require.Len(t, selectedNodes, reqCount)
|
||||||
for _, node := range selectedNodes {
|
for _, node := range selectedNodes {
|
||||||
selectedNodeCount[node.ID]++
|
selectedNodeCount[node.ID]++
|
||||||
@ -238,3 +238,58 @@ func TestSelectBySubnetOneAtATime(t *testing.T) {
|
|||||||
// expect that the single node is selected ~50% of the time
|
// expect that the single node is selected ~50% of the time
|
||||||
assert.InDelta(t, subnetB1Count/total, uniqueSubnet, selectionEpsilon)
|
assert.InDelta(t, subnetB1Count/total, uniqueSubnet, selectionEpsilon)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSelectFiltered(t *testing.T) {
|
||||||
|
|
||||||
|
ctx := testcontext.New(t)
|
||||||
|
defer ctx.Cleanup()
|
||||||
|
|
||||||
|
// create 3 nodes, 2 with same subnet
|
||||||
|
lastNetDuplicate := "1.0.1"
|
||||||
|
firstID := testrand.NodeID()
|
||||||
|
subnetA1 := &uploadselection.Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: firstID,
|
||||||
|
Address: lastNetDuplicate + ".4:8080",
|
||||||
|
},
|
||||||
|
LastNet: lastNetDuplicate,
|
||||||
|
LastIPPort: lastNetDuplicate + ".4:8080",
|
||||||
|
}
|
||||||
|
|
||||||
|
secondID := testrand.NodeID()
|
||||||
|
subnetA2 := &uploadselection.Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: secondID,
|
||||||
|
Address: lastNetDuplicate + ".5:8080",
|
||||||
|
},
|
||||||
|
LastNet: lastNetDuplicate,
|
||||||
|
LastIPPort: lastNetDuplicate + ".5:8080",
|
||||||
|
}
|
||||||
|
|
||||||
|
thirdID := testrand.NodeID()
|
||||||
|
lastNetSingle := "1.0.2"
|
||||||
|
subnetB1 := &uploadselection.Node{
|
||||||
|
NodeURL: storj.NodeURL{
|
||||||
|
ID: thirdID,
|
||||||
|
Address: lastNetSingle + ".5:8080",
|
||||||
|
},
|
||||||
|
LastNet: lastNetSingle,
|
||||||
|
LastIPPort: lastNetSingle + ".5:8080",
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := []*uploadselection.Node{subnetA1, subnetA2, subnetB1}
|
||||||
|
selector := uploadselection.SelectByID(nodes)
|
||||||
|
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{}), 3)
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{ExcludeNodeIDs: []storj.NodeID{firstID}}), 2)
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{}), 3)
|
||||||
|
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{ExcludeNodeIDs: []storj.NodeID{firstID, secondID}}), 1)
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{
|
||||||
|
AutoExcludeSubnets: map[string]struct{}{},
|
||||||
|
}), 2)
|
||||||
|
assert.Len(t, selector.Select(3, uploadselection.Criteria{
|
||||||
|
ExcludeNodeIDs: []storj.NodeID{thirdID},
|
||||||
|
AutoExcludeSubnets: map[string]struct{}{},
|
||||||
|
}), 1)
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package uploadselection
|
package uploadselection
|
||||||
@ -47,9 +47,9 @@ type Stats struct {
|
|||||||
type Selector interface {
|
type Selector interface {
|
||||||
// Count returns the number of maximum number of nodes that it can return.
|
// Count returns the number of maximum number of nodes that it can return.
|
||||||
Count() int
|
Count() int
|
||||||
// Select selects up-to n nodes and excluding the IDs.
|
// Select selects up-to n nodes which are included by the criteria.
|
||||||
// When excludedNets is non-nil it will ensure that selected network is unique.
|
// empty criteria includes all the nodes
|
||||||
Select(n int, excludedIDs []storj.NodeID, excludeNets map[string]struct{}) []*Node
|
Select(n int, criteria Criteria) []*Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewState returns a state based on the input.
|
// NewState returns a state based on the input.
|
||||||
@ -100,16 +100,21 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
|
|||||||
newCount := int(float64(totalCount) * request.NewFraction)
|
newCount := int(float64(totalCount) * request.NewFraction)
|
||||||
|
|
||||||
var selected []*Node
|
var selected []*Node
|
||||||
var excludedNets map[string]struct{}
|
|
||||||
|
|
||||||
var reputableNodes Selector
|
var reputableNodes Selector
|
||||||
var newNodes Selector
|
var newNodes Selector
|
||||||
|
|
||||||
|
var criteria Criteria
|
||||||
|
|
||||||
|
if request.ExcludedIDs != nil {
|
||||||
|
criteria.ExcludeNodeIDs = request.ExcludedIDs
|
||||||
|
}
|
||||||
|
|
||||||
if request.Distinct {
|
if request.Distinct {
|
||||||
excludedNets = map[string]struct{}{}
|
criteria.AutoExcludeSubnets = make(map[string]struct{})
|
||||||
for _, id := range request.ExcludedIDs {
|
for _, id := range request.ExcludedIDs {
|
||||||
if net, ok := state.netByID[id]; ok {
|
if net, ok := state.netByID[id]; ok {
|
||||||
excludedNets[net] = struct{}{}
|
criteria.AutoExcludeSubnets[net] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reputableNodes = state.distinct.Reputable
|
reputableNodes = state.distinct.Reputable
|
||||||
@ -122,12 +127,12 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
|
|||||||
// Get a random selection of new nodes out of the cache first so that if there aren't
|
// Get a random selection of new nodes out of the cache first so that if there aren't
|
||||||
// enough new nodes on the network, we can fall back to using reputable nodes instead.
|
// enough new nodes on the network, we can fall back to using reputable nodes instead.
|
||||||
selected = append(selected,
|
selected = append(selected,
|
||||||
newNodes.Select(newCount, request.ExcludedIDs, excludedNets)...)
|
newNodes.Select(newCount, criteria)...)
|
||||||
|
|
||||||
// Get all the remaining reputable nodes.
|
// Get all the remaining reputable nodes.
|
||||||
reputableCount := totalCount - len(selected)
|
reputableCount := totalCount - len(selected)
|
||||||
selected = append(selected,
|
selected = append(selected,
|
||||||
reputableNodes.Select(reputableCount, request.ExcludedIDs, excludedNets)...)
|
reputableNodes.Select(reputableCount, criteria)...)
|
||||||
|
|
||||||
if len(selected) < totalCount {
|
if len(selected) < totalCount {
|
||||||
return selected, ErrNotEnoughNodes.New("requested from cache %d, found %d", totalCount, len(selected))
|
return selected, ErrNotEnoughNodes.New("requested from cache %d, found %d", totalCount, len(selected))
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Incache.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package uploadselection_test
|
package uploadselection_test
|
||||||
|
Loading…
Reference in New Issue
Block a user