bbdb351e5e
What: Use the github.com/jackc/pgx postgresql driver in place of github.com/lib/pq. Why: github.com/lib/pq has some problems with error handling and context cancellations (i.e. it might even issue queries or DML statements more than once! see https://github.com/lib/pq/issues/939). The github.com/jackx/pgx library appears not to have these problems, and also appears to be better engineered and implemented (in particular, it doesn't use "exceptions by panic"). It should also give us some performance improvements in some cases, and even more so if we can use it directly instead of going through the database/sql layer. Change-Id: Ia696d220f340a097dee9550a312d37de14ed2044
330 lines
8.8 KiB
Go
330 lines
8.8 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/private/version"
|
|
"storj.io/storj/private/dbutil/pgutil"
|
|
"storj.io/storj/satellite/overlay"
|
|
)
|
|
|
|
func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if totalNeededNodes == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
if newNodeCount > totalNeededNodes {
|
|
return nil, Error.New("requested new node count can't exceed requested total node count")
|
|
}
|
|
|
|
needNewNodes := newNodeCount
|
|
needReputableNodes := totalNeededNodes - needNewNodes
|
|
|
|
receivedNewNodes := 0
|
|
receivedNodeNetworks := make(map[string]struct{})
|
|
|
|
excludedIDs := append([]storj.NodeID{}, criteria.ExcludedIDs...)
|
|
excludedNetworks := append([]string{}, criteria.ExcludedNetworks...)
|
|
|
|
for i := 0; i < 3; i++ {
|
|
reputableNodes, newNodes, err := cache.selectStorageNodesOnce(ctx, needReputableNodes, needNewNodes, criteria, excludedIDs, excludedNetworks)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, node := range newNodes {
|
|
// checking for last net collision among reputable and new nodes since we can't check within the query
|
|
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
|
|
continue
|
|
}
|
|
|
|
excludedIDs = append(excludedIDs, node.ID)
|
|
excludedNetworks = append(excludedNetworks, node.LastNet)
|
|
|
|
nodes = append(nodes, node)
|
|
needNewNodes--
|
|
receivedNewNodes++
|
|
|
|
if criteria.DistinctIP {
|
|
receivedNodeNetworks[node.LastNet] = struct{}{}
|
|
}
|
|
}
|
|
for _, node := range reputableNodes {
|
|
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
|
|
continue
|
|
}
|
|
|
|
excludedIDs = append(excludedIDs, node.ID)
|
|
excludedNetworks = append(excludedNetworks, node.LastNet)
|
|
|
|
nodes = append(nodes, node)
|
|
needReputableNodes--
|
|
|
|
if criteria.DistinctIP {
|
|
receivedNodeNetworks[node.LastNet] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// when we did not find new nodes, then return all as reputable
|
|
if needNewNodes > 0 && receivedNewNodes == 0 {
|
|
needReputableNodes += needNewNodes
|
|
needNewNodes = 0
|
|
}
|
|
|
|
if needReputableNodes <= 0 && needNewNodes <= 0 {
|
|
break
|
|
}
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
newNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, true)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
reputableNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, false)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var reputableNodeQuery, newNodeQuery partialQuery
|
|
|
|
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
|
|
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
|
|
if !criteria.DistinctIP {
|
|
reputableNodeQuery = partialQuery{
|
|
selection: `SELECT last_net, id, address, last_ip_port, false FROM nodes`,
|
|
condition: reputableNodesCondition,
|
|
limit: reputableNodeCount,
|
|
}
|
|
newNodeQuery = partialQuery{
|
|
selection: `SELECT last_net, id, address, last_ip_port, true FROM nodes`,
|
|
condition: newNodesCondition,
|
|
limit: newNodeCount,
|
|
}
|
|
} else {
|
|
reputableNodeQuery = partialQuery{
|
|
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`,
|
|
condition: reputableNodesCondition,
|
|
distinct: true,
|
|
limit: reputableNodeCount,
|
|
orderBy: "last_net",
|
|
}
|
|
newNodeQuery = partialQuery{
|
|
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`,
|
|
condition: newNodesCondition,
|
|
distinct: true,
|
|
limit: newNodeCount,
|
|
orderBy: "last_net",
|
|
}
|
|
}
|
|
|
|
query := unionAll(newNodeQuery, reputableNodeQuery)
|
|
|
|
rows, err := cache.db.Query(ctx, cache.db.Rebind(query.query), query.args...)
|
|
if err != nil {
|
|
return nil, nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
for rows.Next() {
|
|
var node overlay.SelectedNode
|
|
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
|
|
var lastIPPort sql.NullString
|
|
var isNew bool
|
|
|
|
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &isNew)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if lastIPPort.Valid {
|
|
node.LastIPPort = lastIPPort.String
|
|
}
|
|
|
|
if isNew {
|
|
newNodes = append(newNodes, &node)
|
|
} else {
|
|
reputableNodes = append(reputableNodes, &node)
|
|
}
|
|
|
|
if len(newNodes) >= newNodeCount && len(reputableNodes) >= reputableNodeCount {
|
|
break
|
|
}
|
|
}
|
|
return reputableNodes, newNodes, Error.Wrap(rows.Err())
|
|
}
|
|
|
|
// nodeSelectionCondition creates a condition with arguments that corresponds to the arguments.
|
|
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
|
|
var conds conditions
|
|
conds.add(`disqualified IS NULL`)
|
|
conds.add(`unknown_audit_suspended IS NULL`)
|
|
conds.add(`exit_initiated_at IS NULL`)
|
|
|
|
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
|
|
conds.add(`free_disk >= ?`, criteria.FreeDisk)
|
|
conds.add(`last_contact_success > ?`, time.Now().Add(-criteria.OnlineWindow))
|
|
|
|
if isNewNodeQuery {
|
|
conds.add(
|
|
`(total_audit_count < ? OR total_uptime_count < ?)`,
|
|
criteria.AuditCount, criteria.UptimeCount,
|
|
)
|
|
} else {
|
|
conds.add(
|
|
`total_audit_count >= ? AND total_uptime_count >= ?`,
|
|
criteria.AuditCount, criteria.UptimeCount,
|
|
)
|
|
}
|
|
|
|
if criteria.MinimumVersion != "" {
|
|
v, err := version.NewSemVer(criteria.MinimumVersion)
|
|
if err != nil {
|
|
return condition{}, Error.New("invalid node selection criteria version: %v", err)
|
|
}
|
|
conds.add(
|
|
`(major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?)))) AND release`,
|
|
v.Major, v.Major, v.Minor, v.Minor, v.Patch,
|
|
)
|
|
}
|
|
|
|
if len(excludedIDs) > 0 {
|
|
conds.add(
|
|
`not (id = any(?::bytea[]))`,
|
|
pgutil.NodeIDArray(excludedIDs),
|
|
)
|
|
}
|
|
if criteria.DistinctIP {
|
|
if len(excludedNetworks) > 0 {
|
|
conds.add(
|
|
`not (last_net = any(?::text[]))`,
|
|
pgutil.StringArray(excludedNetworks),
|
|
)
|
|
}
|
|
conds.add(`last_net <> ''`)
|
|
}
|
|
return conds.combine(), nil
|
|
}
|
|
|
|
// partialQuery corresponds to a query
|
|
//
|
|
// distinct=false
|
|
//
|
|
// $selection WHERE $condition ORDER BY $orderBy, RANDOM() LIMIT $limit
|
|
//
|
|
// distinct=true
|
|
//
|
|
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
|
|
//
|
|
type partialQuery struct {
|
|
selection string
|
|
condition condition
|
|
distinct bool
|
|
orderBy string
|
|
limit int
|
|
}
|
|
|
|
// isEmpty returns whether the result for the query is definitely empty.
|
|
func (partial partialQuery) isEmpty() bool {
|
|
return partial.limit == 0
|
|
}
|
|
|
|
// asQuery combines partialQuery parameters into a single select query.
|
|
func (partial partialQuery) asQuery() query {
|
|
var q strings.Builder
|
|
var args []interface{}
|
|
|
|
if partial.distinct {
|
|
// For distinct queries we need to redo randomized ordering.
|
|
fmt.Fprintf(&q, "SELECT * FROM (")
|
|
}
|
|
|
|
fmt.Fprint(&q, partial.selection, " WHERE ", partial.condition.query)
|
|
args = append(args, partial.condition.args...)
|
|
|
|
if partial.orderBy != "" {
|
|
fmt.Fprintf(&q, " ORDER BY %s, RANDOM() ", partial.orderBy)
|
|
} else {
|
|
fmt.Fprint(&q, " ORDER BY RANDOM() ")
|
|
}
|
|
|
|
if !partial.distinct {
|
|
fmt.Fprint(&q, " LIMIT ? ")
|
|
args = append(args, partial.limit)
|
|
} else {
|
|
fmt.Fprint(&q, ") filtered ORDER BY RANDOM() LIMIT ?")
|
|
args = append(args, partial.limit)
|
|
}
|
|
|
|
return query{query: q.String(), args: args}
|
|
}
|
|
|
|
// unionAll combines multiple partial queries into a single query.
|
|
func unionAll(partials ...partialQuery) query {
|
|
var queries []string
|
|
var args []interface{}
|
|
for _, partial := range partials {
|
|
if partial.isEmpty() {
|
|
continue
|
|
}
|
|
|
|
q := partial.asQuery()
|
|
queries = append(queries, q.query)
|
|
args = append(args, q.args...)
|
|
}
|
|
|
|
if len(queries) == 0 {
|
|
return query{}
|
|
}
|
|
if len(queries) == 1 {
|
|
return query{query: queries[0], args: args}
|
|
}
|
|
|
|
return query{
|
|
query: "(" + strings.Join(queries, ") UNION ALL (") + ")",
|
|
args: args,
|
|
}
|
|
}
|
|
|
|
type condition struct {
|
|
query string
|
|
args []interface{}
|
|
}
|
|
|
|
type conditions []condition
|
|
|
|
func (conds *conditions) add(q string, args ...interface{}) {
|
|
*conds = append(*conds, condition{query: q, args: args})
|
|
}
|
|
|
|
func (conds conditions) combine() condition {
|
|
var qs []string
|
|
var args []interface{}
|
|
for _, c := range conds {
|
|
qs = append(qs, c.query)
|
|
args = append(args, c.args...)
|
|
}
|
|
return condition{query: " " + strings.Join(qs, " AND ") + " ", args: args}
|
|
}
|
|
|
|
type query struct {
|
|
query string
|
|
args []interface{}
|
|
}
|