satellite/overlay: combine SelectStorageNodes and SelectNewStorageNodes (#3831)

This commit is contained in:
Natalie Villasana 2020-04-09 11:19:44 -04:00 committed by GitHub
parent d658a6a6ec
commit cf80b3caf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 346 additions and 267 deletions

View File

@ -302,7 +302,7 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("SelectStorageNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, criteria)
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, 0, criteria)
require.NoError(b, err)
require.NotEmpty(b, selected)
}
@ -310,7 +310,7 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("SelectNewStorageNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := overlaydb.SelectNewStorageNodes(ctx, SelectCount, criteria)
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, SelectCount, criteria)
require.NoError(b, err)
require.NotEmpty(b, selected)
}
@ -318,7 +318,7 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("SelectStorageNodesExclusion", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, excludedCriteria)
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, 0, excludedCriteria)
require.NoError(b, err)
require.NotEmpty(b, selected)
}
@ -326,7 +326,7 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("SelectNewStorageNodesExclusion", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := overlaydb.SelectNewStorageNodes(ctx, SelectCount, excludedCriteria)
selected, err := overlaydb.SelectStorageNodes(ctx, SelectCount, SelectCount, excludedCriteria)
require.NoError(b, err)
require.NotEmpty(b, selected)
}

View File

@ -39,10 +39,7 @@ type DB interface {
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs
GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (map[storj.NodeID]*SelectedNode, error)
// SelectStorageNodes looks up nodes based on criteria
SelectStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*SelectedNode, error)
// SelectNewStorageNodes looks up nodes based on new node criteria
SelectNewStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*SelectedNode, error)
SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *NodeCriteria) ([]*SelectedNode, error)
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// KnownOffline filters a set of nodes to offline nodes
@ -287,9 +284,9 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
reputableNodeCount := req.MinimumRequiredNodes
if reputableNodeCount <= 0 {
reputableNodeCount = req.RequestedCount
totalNeededNodes := req.MinimumRequiredNodes
if totalNeededNodes <= 0 {
totalNeededNodes = req.RequestedCount
}
excludedIDs := req.ExcludedIDs
@ -305,31 +302,7 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
newNodeCount := 0
if preferences.NewNodeFraction > 0 {
newNodeCount = int(float64(reputableNodeCount) * preferences.NewNodeFraction)
}
var newNodes []*SelectedNode
if newNodeCount > 0 {
newNodes, err = service.db.SelectNewStorageNodes(ctx, newNodeCount, &NodeCriteria{
FreeDisk: preferences.MinimumDiskSpace.Int64(),
AuditCount: preferences.AuditCount,
ExcludedIDs: excludedIDs,
MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP,
ExcludedNetworks: excludedNetworks,
})
if err != nil {
return nil, Error.Wrap(err)
}
}
// add selected new nodes ID and network to the excluded lists for reputable node selection
for _, newNode := range newNodes {
excludedIDs = append(excludedIDs, newNode.ID)
if preferences.DistinctIP {
excludedNetworks = append(excludedNetworks, newNode.LastNet)
}
newNodeCount = int(float64(totalNeededNodes) * preferences.NewNodeFraction)
}
criteria := NodeCriteria{
@ -342,16 +315,13 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP,
}
reputableNodes, err := service.db.SelectStorageNodes(ctx, reputableNodeCount-len(newNodes), &criteria)
nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria)
if err != nil {
return nil, Error.Wrap(err)
}
nodes = append(nodes, newNodes...)
nodes = append(nodes, reputableNodes...)
if len(nodes) < reputableNodeCount {
return nodes, ErrNotEnoughNodes.New("requested %d found %d; %+v ", reputableNodeCount, len(nodes), criteria)
if len(nodes) < totalNeededNodes {
return nodes, ErrNotEnoughNodes.New("requested %d found %d; %+v ", totalNeededNodes, len(nodes), criteria)
}
return nodes, nil

View File

@ -190,13 +190,13 @@ func TestRandomizedSelection(t *testing.T) {
var err error
if i%2 == 0 {
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, 0, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
} else {
nodes, err = cache.SelectNewStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
@ -589,7 +589,7 @@ func TestGetSuccesfulNodesNotCheckedInSince(t *testing.T) {
})
}
// TestSuspendedSelection ensures that suspended nodes are not selected by SelectStorageNodes or SelectNewStorageNodes
// TestSuspendedSelection ensures that suspended nodes are not selected by SelectStorageNodes
func TestSuspendedSelection(t *testing.T) {
totalNodes := 10
@ -640,7 +640,7 @@ func TestSuspendedSelection(t *testing.T) {
numNodesToSelect := 10
// select 10 vetted nodes - 5 vetted, 2 suspended, so expect 3
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, 0, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
@ -651,7 +651,7 @@ func TestSuspendedSelection(t *testing.T) {
}
// select 10 new nodes - 5 new, 2 suspended, so expect 3
nodes, err = cache.SelectNewStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})

View File

@ -0,0 +1,329 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/private/version"
"storj.io/storj/satellite/overlay"
)
func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if totalNeededNodes == 0 {
return nil, nil
}
if newNodeCount > totalNeededNodes {
return nil, Error.New("requested new node count can't exceed requested total node count")
}
needNewNodes := newNodeCount
needReputableNodes := totalNeededNodes - needNewNodes
receivedNewNodes := 0
receivedNodeNetworks := make(map[string]struct{})
var excludedIDs []storj.NodeID
excludedIDs = append(excludedIDs, criteria.ExcludedIDs...)
var excludedNetworks []string
excludedNetworks = append(excludedNetworks, criteria.ExcludedNetworks...)
for i := 0; i < 3; i++ {
reputableNodes, newNodes, err := cache.selectStorageNodesOnce(ctx, needReputableNodes, needNewNodes, criteria, excludedIDs, excludedNetworks)
if err != nil {
return nil, err
}
for _, node := range newNodes {
// checking for last net collision among reputable and new nodes since we can't check within the query
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needNewNodes--
receivedNewNodes++
if criteria.DistinctIP {
receivedNodeNetworks[node.LastNet] = struct{}{}
}
}
for _, node := range reputableNodes {
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needReputableNodes--
if criteria.DistinctIP {
receivedNodeNetworks[node.LastNet] = struct{}{}
}
}
if needNewNodes > 0 && receivedNewNodes == 0 {
needReputableNodes += needNewNodes
needNewNodes = 0
}
if needReputableNodes <= 0 && needNewNodes <= 0 {
break
}
}
return nodes, nil
}
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
newNodeSelection := `SELECT last_net, id, address, last_ip_port, true FROM nodes`
reputableNodeSelection := `SELECT last_net, id, address, last_ip_port, false FROM nodes`
if criteria.DistinctIP {
newNodeSelection = `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`
reputableNodeSelection = `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`
}
newNodesCondition, err := buildConditions(ctx, criteria, excludedIDs, excludedNetworks, true)
if err != nil {
return nil, nil, err
}
reputableNodesCondition, err := buildConditions(ctx, criteria, excludedIDs, excludedNetworks, false)
if err != nil {
return nil, nil, err
}
var newNodeQuery, reputableNodeQuery query
newNodeQuery = assemble(partialQuery{selection: newNodeSelection, condition: newNodesCondition, limit: newNodeCount})
if criteria.DistinctIP {
newNodeQuery = assemble(partialQuery{selection: newNodeSelection, condition: newNodesCondition, orderBy: "last_net", distinct: true, limit: newNodeCount})
newNodeQuery = selectRandomFrom(newNodeQuery, "filteredcandidates", newNodeCount)
}
reputableNodeQuery = assemble(partialQuery{selection: reputableNodeSelection, condition: reputableNodesCondition, limit: reputableNodeCount})
if criteria.DistinctIP {
reputableNodeQuery = assemble(partialQuery{selection: reputableNodeSelection, condition: reputableNodesCondition, orderBy: "last_net", distinct: true, limit: reputableNodeCount})
reputableNodeQuery = selectRandomFrom(reputableNodeQuery, "filteredcandidates", reputableNodeCount)
}
query := union(newNodeQuery, reputableNodeQuery)
rows, err := cache.db.Query(ctx, cache.db.Rebind(query.query), query.args...)
if err != nil {
return nil, nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
var isNew bool
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &isNew)
if err != nil {
return nil, nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
if isNew {
newNodes = append(newNodes, &node)
} else {
reputableNodes = append(reputableNodes, &node)
}
if len(newNodes) >= newNodeCount && len(reputableNodes) >= reputableNodeCount {
break
}
}
return reputableNodes, newNodes, Error.Wrap(rows.Err())
}
func buildConditions(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
var conds conditions
if isNewNodeQuery {
conds.add(
"(total_audit_count < ? OR total_uptime_count < ?)",
criteria.AuditCount, criteria.UptimeCount,
)
} else {
conds.add(
"total_audit_count >= ? AND total_uptime_count >= ?",
criteria.AuditCount, criteria.UptimeCount,
)
}
conds.add(`disqualified is null
AND suspended is null
AND exit_initiated_at is null`)
conds.add("type = ?", int(pb.NodeType_STORAGE))
conds.add("free_disk >= ?", criteria.FreeDisk)
conds.add("last_contact_success > ?", time.Now().Add(-criteria.OnlineWindow))
if criteria.MinimumVersion != "" {
v, err := version.NewSemVer(criteria.MinimumVersion)
if err != nil {
return condition{}, Error.New("invalid node selection criteria version: %v", err)
}
conds.add(
"(major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?)))) AND release ",
v.Major, v.Major, v.Minor, v.Minor, v.Patch,
)
}
cond := combine(conds...)
if len(excludedIDs) > 0 {
excludedIDsQuery := " AND id NOT IN (?" + strings.Repeat(", ?", len(excludedIDs)-1) + ") "
cond.addQuery(excludedIDsQuery)
for _, id := range excludedIDs {
cond.addArg(id)
}
}
if criteria.DistinctIP {
if len(excludedNetworks) > 0 {
excludedNetworksQuery := " AND last_net NOT IN (?" + strings.Repeat(", ?", len(excludedNetworks)-1) + ") "
cond.addQuery(excludedNetworksQuery)
for _, subnet := range excludedNetworks {
cond.addArg(subnet)
}
}
cond.addQuery(" AND last_net <> ''")
}
return cond, nil
}
func assemble(partial partialQuery) query {
var q strings.Builder
var args []interface{}
if partial.limit == 0 {
return query{}
}
fmt.Fprint(&q, partial.selection)
fmt.Fprintf(&q, " WHERE %s ", partial.condition.query)
args = append(args, partial.condition.args...)
if partial.orderBy != "" {
fmt.Fprintf(&q, " ORDER BY %s, RANDOM() ", partial.orderBy)
} else {
fmt.Fprint(&q, " ORDER BY RANDOM() ")
}
if !partial.distinct {
fmt.Fprintf(&q, " LIMIT ? ")
args = append(args, partial.limit)
}
return query{
query: q.String(),
args: args,
}
}
func selectRandomFrom(q query, alias string, limit int) query {
if limit == 0 {
return query{}
}
queryString := fmt.Sprintf("SELECT * FROM (%s) %s ORDER BY RANDOM() LIMIT ?", q.query, alias)
newArgs := make([]interface{}, len(q.args)+1)
copy(newArgs[:len(q.args)], q.args)
newArgs[len(q.args)] = limit
return query{
query: queryString,
args: newArgs,
}
}
func isEmpty(q query) bool {
if q.query == "" && len(q.args) == 0 {
return true
}
return false
}
func union(q1, q2 query) query {
if isEmpty(q1) {
return q2
}
if isEmpty(q2) {
return q1
}
finalQuery := "(" + q1.query + ") UNION ALL (" + q2.query + ")"
var args []interface{}
args = append(args, q1.args...)
args = append(args, q2.args...)
return query{
query: finalQuery,
args: args,
}
}
type partialQuery struct {
selection string
condition condition
orderBy string
limit int
distinct bool
}
type condition struct {
query string
args []interface{}
}
func (cond *condition) addQuery(q string) {
cond.query += q
}
func (cond *condition) addArg(arg interface{}) {
cond.args = append(cond.args, arg)
}
type conditions []condition
func (xs *conditions) add(q string, args ...interface{}) {
*xs = append(*xs, cond(q, args...))
}
func cond(query string, args ...interface{}) condition {
return condition{
query: query,
args: args,
}
}
func combine(conditions ...condition) condition {
var qs []string
var args []interface{}
for _, c := range conditions {
qs = append(qs, c.query)
args = append(args, c.args...)
}
return cond(strings.Join(qs, " AND "), args...)
}
type query struct {
query string
args []interface{}
}

View File

@ -9,7 +9,6 @@ import (
"encoding/hex"
"fmt"
"sort"
"strings"
"time"
"github.com/lib/pq"
@ -33,114 +32,6 @@ type overlaycache struct {
db *satelliteDB
}
func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
nodeType := int(pb.NodeType_STORAGE)
safeQuery := `
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_initiated_at IS NULL
AND type = ?
AND free_disk >= ?
AND total_audit_count >= ?
AND total_uptime_count >= ?
AND last_contact_success > ?`
args := append(make([]interface{}, 0, 13),
nodeType, criteria.FreeDisk, criteria.AuditCount,
criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
if criteria.MinimumVersion != "" {
v, err := version.NewSemVer(criteria.MinimumVersion)
if err != nil {
return nil, Error.New("invalid node selection criteria version: %v", err)
}
safeQuery += `
AND (major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?))))
AND release`
args = append(args, v.Major, v.Major, v.Minor, v.Minor, v.Patch)
}
if !criteria.DistinctIP {
nodes, err = cache.queryNodes(ctx, criteria.ExcludedIDs, count, safeQuery, args...)
if err != nil {
return nil, err
}
return nodes, nil
}
for i := 0; i < 3; i++ {
moreNodes, err := cache.queryNodesDistinct(ctx, criteria.ExcludedIDs, criteria.ExcludedNetworks, count-len(nodes), safeQuery, criteria.DistinctIP, args...)
if err != nil {
return nil, err
}
for _, n := range moreNodes {
nodes = append(nodes, n)
criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.ID)
criteria.ExcludedNetworks = append(criteria.ExcludedNetworks, n.LastNet)
}
if len(nodes) == count {
break
}
}
return nodes, nil
}
func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
nodeType := int(pb.NodeType_STORAGE)
safeQuery := `
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_initiated_at IS NULL
AND type = ?
AND free_disk >= ?
AND (total_audit_count < ? OR total_uptime_count < ?)
AND last_contact_success > ?`
args := append(make([]interface{}, 0, 10),
nodeType, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
if criteria.MinimumVersion != "" {
v, err := version.NewSemVer(criteria.MinimumVersion)
if err != nil {
return nil, Error.New("invalid node selection criteria version: %v", err)
}
safeQuery += `
AND (major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?))))
AND release`
args = append(args, v.Major, v.Major, v.Minor, v.Minor, v.Patch)
}
if !criteria.DistinctIP {
nodes, err = cache.queryNodes(ctx, criteria.ExcludedIDs, count, safeQuery, args...)
if err != nil {
return nil, err
}
return nodes, nil
}
for i := 0; i < 3; i++ {
moreNodes, err := cache.queryNodesDistinct(ctx, criteria.ExcludedIDs, criteria.ExcludedNetworks, count-len(nodes), safeQuery, criteria.DistinctIP, args...)
if err != nil {
return nil, err
}
for _, n := range moreNodes {
nodes = append(nodes, n)
criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.ID)
criteria.ExcludedNetworks = append(criteria.ExcludedNetworks, n.LastNet)
}
if len(nodes) == count {
break
}
}
return nodes, nil
}
// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed.
func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
defer mon.Task()(&ctx)(&err)
@ -167,117 +58,6 @@ func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.
return nodeNets, Error.Wrap(rows.Err())
}
func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj.NodeID, count int, safeQuery string, args ...interface{}) (_ []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if count == 0 {
return nil, nil
}
safeExcludeNodes := ""
if len(excludedNodes) > 0 {
safeExcludeNodes = ` AND id NOT IN (?` + strings.Repeat(", ?", len(excludedNodes)-1) + `)`
for _, id := range excludedNodes {
args = append(args, id.Bytes())
}
}
args = append(args, count)
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT last_net, id, address, last_ip_port
FROM nodes
`+safeQuery+safeExcludeNodes+`
ORDER BY RANDOM()
LIMIT ?`), args...)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var nodes []*overlay.SelectedNode
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort)
if err != nil {
return nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
nodes = append(nodes, &node)
}
return nodes, Error.Wrap(rows.Err())
}
func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedIDs []storj.NodeID, excludedNodeNetworks []string, count int, safeQuery string, distinctIP bool, args ...interface{}) (_ []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if count == 0 {
return nil, nil
}
safeExcludeNodes := ""
if len(excludedIDs) > 0 {
safeExcludeNodes = ` AND id NOT IN (?` + strings.Repeat(", ?", len(excludedIDs)-1) + `)`
for _, id := range excludedIDs {
args = append(args, id.Bytes())
}
}
safeExcludeNetworks := ""
if len(excludedNodeNetworks) > 0 {
safeExcludeNetworks = ` AND last_net NOT IN (?` + strings.Repeat(", ?", len(excludedNodeNetworks)-1) + `)`
for _, ip := range excludedNodeNetworks {
args = append(args, ip)
}
}
args = append(args, count)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT *
FROM (
SELECT DISTINCT ON (last_net) last_net, -- choose at most 1 node from this network
id, address, last_ip_port
FROM nodes
`+safeQuery+safeExcludeNodes+safeExcludeNetworks+`
AND last_net <> '' -- select nodes with a network set
ORDER BY last_net, RANDOM() -- equal chance of choosing any qualified node at this network
) filteredcandidates
ORDER BY RANDOM() -- do the actual node selection from filtered pool
LIMIT ?`), args...)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var nodes []*overlay.SelectedNode
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort)
if err != nil {
return nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
nodes = append(nodes, &node)
}
return nodes, Error.Wrap(rows.Err())
}
// Get looks up the node by nodeID
func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)