storj/satellite/satellitedb/nodeselection.go
paul cannon 2522ff09b6 satellite/overlay: configurable meaning of last_net
Up to now, we have been implementing the DistinctIP preference with code
in two places:

 1. On check-in, the last_net is determined by taking the /24 or /64
    (in ResolveIPAndNetwork()) and we store it with the node record.
 2. On node selection, a preference parameter defines whether to return
    results that are distinct on last_net.

It can be observed that we have never yet had the need to switch from
DistinctIP to !DistinctIP, or from !DistinctIP to DistinctIP, on the
same satellite, and we will probably never need to do so in an automated
way. It can also be observed that this arrangement makes tests more
complicated, because we often have to arrange for test nodes to have IP
addresses in different /24 networks (a particular pain on macOS).

Those two considerations, plus some pending work on the repair framework
that will make repair take last_net into consideration, motivate this
change.

With this change, in the #2 place, we will _always_ return results that
are distinct on last_net. We implement the DistinctIP preference, then,
by making the #1 place (ResolveIPAndNetwork()) more flexible. When
DistinctIP is enabled, last_net will be calculated as it was before. But
when DistinctIP is _off_, last_net can be the same as address (IP and
port). That will effectively implement !DistinctIP because every
record will have a distinct last_net already.

As a side effect, this flexibility will allow us to change the rules
about last_net construction arbitrarily. We can do tests where last_net
is set to the source IP, or to a /30 prefix, or a /16 prefix, etc., and
be able to exercise the production logic without requiring a virtual
network bridge.

This change should be safe to make without any migration code, because
all known production satellite deployments use DistinctIP, and the
associated last_net values will not change for them. They will only
change for satellites with !DistinctIP, which are mostly test
deployments that can be recreated trivially. For those satellites which
are both permanent and !DistinctIP, node selection will suddenly start
acting as though DistinctIP is enabled, until the operator runs a single
SQL update "UPDATE nodes SET last_net = last_ip_port". That can be done
either before or after deploying software with this change.

I also assert that this will not hurt performance for production
deployments. It's true that adding the distinct requirement to node
selection makes things a little slower, but the distinct requirement is
already present for all production deployments, and they will see no
change.

Refs: https://github.com/storj/storj/issues/5391
Change-Id: I0e7e92498c3da768df5b4d5fb213dcd2d4862924
2023-03-09 02:20:12 +00:00

312 lines
8.4 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/private/dbutil/pgutil"
"storj.io/private/version"
"storj.io/storj/satellite/overlay"
)
func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if totalNeededNodes == 0 {
return nil, nil
}
if newNodeCount > totalNeededNodes {
return nil, Error.New("requested new node count can't exceed requested total node count")
}
needNewNodes := newNodeCount
needReputableNodes := totalNeededNodes - needNewNodes
receivedNewNodes := 0
receivedNodeNetworks := make(map[string]struct{})
excludedIDs := append([]storj.NodeID{}, criteria.ExcludedIDs...)
excludedNetworks := append([]string{}, criteria.ExcludedNetworks...)
for i := 0; i < 3; i++ {
reputableNodes, newNodes, err := cache.selectStorageNodesOnce(ctx, needReputableNodes, needNewNodes, criteria, excludedIDs, excludedNetworks)
if err != nil {
return nil, err
}
for _, node := range newNodes {
// checking for last net collision among reputable and new nodes since we can't check within the query
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needNewNodes--
receivedNewNodes++
receivedNodeNetworks[node.LastNet] = struct{}{}
}
for _, node := range reputableNodes {
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needReputableNodes--
receivedNodeNetworks[node.LastNet] = struct{}{}
}
// when we did not find new nodes, then return all as reputable
if needNewNodes > 0 && receivedNewNodes == 0 {
needReputableNodes += needNewNodes
needNewNodes = 0
}
if needReputableNodes <= 0 && needNewNodes <= 0 {
break
}
}
return nodes, nil
}
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
newNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, true)
if err != nil {
return nil, nil, err
}
reputableNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, false)
if err != nil {
return nil, nil, err
}
var reputableNodeQuery, newNodeQuery partialQuery
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
}
query := unionAll(newNodeQuery, reputableNodeQuery)
query.query = cache.db.impl.WrapAsOfSystemInterval(query.query, criteria.AsOfSystemInterval)
rows, err := cache.db.Query(ctx, cache.db.Rebind(query.query), query.args...)
if err != nil {
return nil, nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
var isNew bool
var noise noiseScanner
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &noise.Proto, &noise.PublicKey, &isNew)
if err != nil {
return nil, nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
node.Address.NoiseInfo = noise.Convert()
if isNew {
newNodes = append(newNodes, &node)
} else {
reputableNodes = append(reputableNodes, &node)
}
if len(newNodes) >= newNodeCount && len(reputableNodes) >= reputableNodeCount {
break
}
}
return reputableNodes, newNodes, Error.Wrap(rows.Err())
}
// nodeSelectionCondition creates a condition with arguments that corresponds to the arguments.
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
var conds conditions
conds.add(`disqualified IS NULL`)
conds.add(`unknown_audit_suspended IS NULL`)
conds.add(`offline_suspended IS NULL`)
conds.add(`exit_initiated_at IS NULL`)
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
conds.add(`free_disk >= ?`, criteria.FreeDisk)
conds.add(`last_contact_success > ?`, time.Now().UTC().Add(-criteria.OnlineWindow))
if isNewNodeQuery {
conds.add(
`vetted_at IS NULL`,
)
} else {
conds.add(
`vetted_at is NOT NULL`,
)
}
if criteria.MinimumVersion != "" {
v, err := version.NewSemVer(criteria.MinimumVersion)
if err != nil {
return condition{}, Error.New("invalid node selection criteria version: %v", err)
}
conds.add(
`(major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?)))) AND release`,
v.Major, v.Major, v.Minor, v.Minor, v.Patch,
)
}
if len(excludedIDs) > 0 {
conds.add(
`NOT (id = ANY(?::bytea[]))`,
pgutil.NodeIDArray(excludedIDs),
)
}
if len(excludedNetworks) > 0 {
conds.add(
`NOT (last_net = ANY(?::text[]))`,
pgutil.TextArray(excludedNetworks),
)
}
conds.add(`last_net <> ''`)
return conds.combine(), nil
}
// partialQuery corresponds to a query.
//
// distinct=false
//
// $selection WHERE $condition ORDER BY $orderBy, RANDOM() LIMIT $limit
//
// distinct=true
//
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
type partialQuery struct {
selection string
condition condition
distinct bool
orderBy string
limit int
}
// isEmpty returns whether the result for the query is definitely empty.
func (partial partialQuery) isEmpty() bool {
return partial.limit == 0
}
// asQuery combines partialQuery parameters into a single select query.
func (partial partialQuery) asQuery() query {
var q strings.Builder
var args []interface{}
if partial.distinct {
// For distinct queries we need to redo randomized ordering.
fmt.Fprintf(&q, "SELECT * FROM (")
}
fmt.Fprint(&q, partial.selection, " WHERE ", partial.condition.query)
args = append(args, partial.condition.args...)
if partial.orderBy != "" {
fmt.Fprintf(&q, " ORDER BY %s, RANDOM() ", partial.orderBy)
} else {
fmt.Fprint(&q, " ORDER BY RANDOM() ")
}
if !partial.distinct {
fmt.Fprint(&q, " LIMIT ? ")
args = append(args, partial.limit)
} else {
fmt.Fprint(&q, ") filtered ORDER BY RANDOM() LIMIT ?")
args = append(args, partial.limit)
}
return query{query: q.String(), args: args}
}
// unionAll combines multiple partial queries into a single query.
func unionAll(partials ...partialQuery) query {
var queries []string
var args []interface{}
for _, partial := range partials {
if partial.isEmpty() {
continue
}
q := partial.asQuery()
queries = append(queries, q.query)
args = append(args, q.args...)
}
if len(queries) == 0 {
return query{}
}
if len(queries) == 1 {
return query{query: queries[0], args: args}
}
return query{
query: "(" + strings.Join(queries, ") UNION ALL (") + ")",
args: args,
}
}
type condition struct {
query string
args []interface{}
}
type conditions []condition
func (conds *conditions) add(q string, args ...interface{}) {
*conds = append(*conds, condition{query: q, args: args})
}
func (conds conditions) combine() condition {
var qs []string
var args []interface{}
for _, c := range conds {
qs = append(qs, c.query)
args = append(args, c.args...)
}
return condition{query: " " + strings.Join(qs, " AND ") + " ", args: args}
}
type query struct {
query string
args []interface{}
}