satellite/satellitedb: node selection code cleanup

Reduce the number of non-methods to reduce funcs in the namespace also
combine a func to slightly condense the code more.

Change-Id: Ifbe728eb8c8ca4c981df648decd259c2097b6b40
This commit is contained in:
Egon Elbre 2020-04-09 18:54:12 +03:00
parent cf80b3caf3
commit ccf4f9ed2d

View File

@ -34,11 +34,8 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
receivedNewNodes := 0
receivedNodeNetworks := make(map[string]struct{})
var excludedIDs []storj.NodeID
excludedIDs = append(excludedIDs, criteria.ExcludedIDs...)
var excludedNetworks []string
excludedNetworks = append(excludedNetworks, criteria.ExcludedNetworks...)
excludedIDs := append([]storj.NodeID{}, criteria.ExcludedIDs...)
excludedNetworks := append([]string{}, criteria.ExcludedNetworks...)
for i := 0; i < 3; i++ {
reputableNodes, newNodes, err := cache.selectStorageNodesOnce(ctx, needReputableNodes, needNewNodes, criteria, excludedIDs, excludedNetworks)
@ -51,8 +48,10 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needNewNodes--
receivedNewNodes++
@ -65,8 +64,10 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
if _, ok := receivedNodeNetworks[node.LastNet]; ok {
continue
}
excludedIDs = append(excludedIDs, node.ID)
excludedNetworks = append(excludedNetworks, node.LastNet)
nodes = append(nodes, node)
needReputableNodes--
@ -75,6 +76,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
}
}
// when we did not find new nodes, then return all as reputable
if needNewNodes > 0 && receivedNewNodes == 0 {
needReputableNodes += needNewNodes
needNewNodes = 0
@ -90,39 +92,48 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
newNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, true)
if err != nil {
return nil, nil, err
}
reputableNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, false)
if err != nil {
return nil, nil, err
}
var reputableNodeQuery, newNodeQuery partialQuery
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
newNodeSelection := `SELECT last_net, id, address, last_ip_port, true FROM nodes`
reputableNodeSelection := `SELECT last_net, id, address, last_ip_port, false FROM nodes`
if criteria.DistinctIP {
newNodeSelection = `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`
reputableNodeSelection = `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`
if !criteria.DistinctIP {
reputableNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, false FROM nodes`,
condition: reputableNodesCondition,
limit: reputableNodeCount,
}
newNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, true FROM nodes`,
condition: newNodesCondition,
limit: newNodeCount,
}
} else {
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
}
}
newNodesCondition, err := buildConditions(ctx, criteria, excludedIDs, excludedNetworks, true)
if err != nil {
return nil, nil, err
}
reputableNodesCondition, err := buildConditions(ctx, criteria, excludedIDs, excludedNetworks, false)
if err != nil {
return nil, nil, err
}
var newNodeQuery, reputableNodeQuery query
newNodeQuery = assemble(partialQuery{selection: newNodeSelection, condition: newNodesCondition, limit: newNodeCount})
if criteria.DistinctIP {
newNodeQuery = assemble(partialQuery{selection: newNodeSelection, condition: newNodesCondition, orderBy: "last_net", distinct: true, limit: newNodeCount})
newNodeQuery = selectRandomFrom(newNodeQuery, "filteredcandidates", newNodeCount)
}
reputableNodeQuery = assemble(partialQuery{selection: reputableNodeSelection, condition: reputableNodesCondition, limit: reputableNodeCount})
if criteria.DistinctIP {
reputableNodeQuery = assemble(partialQuery{selection: reputableNodeSelection, condition: reputableNodesCondition, orderBy: "last_net", distinct: true, limit: reputableNodeCount})
reputableNodeQuery = selectRandomFrom(reputableNodeQuery, "filteredcandidates", reputableNodeCount)
}
query := union(newNodeQuery, reputableNodeQuery)
query := unionAll(newNodeQuery, reputableNodeQuery)
rows, err := cache.db.Query(ctx, cache.db.Rebind(query.query), query.args...)
if err != nil {
@ -135,18 +146,22 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
var isNew bool
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &isNew)
if err != nil {
return nil, nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
if isNew {
newNodes = append(newNodes, &node)
} else {
reputableNodes = append(reputableNodes, &node)
}
if len(newNodes) >= newNodeCount && len(reputableNodes) >= reputableNodeCount {
break
}
@ -154,26 +169,28 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
return reputableNodes, newNodes, Error.Wrap(rows.Err())
}
func buildConditions(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
// nodeSelectionCondition creates a condition with arguments that corresponds to the arguments.
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
var conds conditions
conds.add(`disqualified IS NULL`)
conds.add(`suspended IS NULL`)
conds.add(`exit_initiated_at IS NULL`)
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
conds.add(`free_disk >= ?`, criteria.FreeDisk)
conds.add(`last_contact_success > ?`, time.Now().Add(-criteria.OnlineWindow))
if isNewNodeQuery {
conds.add(
"(total_audit_count < ? OR total_uptime_count < ?)",
`(total_audit_count < ? OR total_uptime_count < ?)`,
criteria.AuditCount, criteria.UptimeCount,
)
} else {
conds.add(
"total_audit_count >= ? AND total_uptime_count >= ?",
`total_audit_count >= ? AND total_uptime_count >= ?`,
criteria.AuditCount, criteria.UptimeCount,
)
}
conds.add(`disqualified is null
AND suspended is null
AND exit_initiated_at is null`)
conds.add("type = ?", int(pb.NodeType_STORAGE))
conds.add("free_disk >= ?", criteria.FreeDisk)
conds.add("last_contact_success > ?", time.Now().Add(-criteria.OnlineWindow))
if criteria.MinimumVersion != "" {
v, err := version.NewSemVer(criteria.MinimumVersion)
@ -181,43 +198,63 @@ func buildConditions(ctx context.Context, criteria *overlay.NodeCriteria, exclud
return condition{}, Error.New("invalid node selection criteria version: %v", err)
}
conds.add(
"(major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?)))) AND release ",
`(major > ? OR (major = ? AND (minor > ? OR (minor = ? AND patch >= ?)))) AND release`,
v.Major, v.Major, v.Minor, v.Minor, v.Patch,
)
}
cond := combine(conds...)
if len(excludedIDs) > 0 {
excludedIDsQuery := " AND id NOT IN (?" + strings.Repeat(", ?", len(excludedIDs)-1) + ") "
cond.addQuery(excludedIDsQuery)
for _, id := range excludedIDs {
cond.addArg(id)
}
conds.addWithIDs(
"id NOT IN (?"+strings.Repeat(", ?", len(excludedIDs)-1)+")",
excludedIDs...,
)
}
if criteria.DistinctIP {
if len(excludedNetworks) > 0 {
excludedNetworksQuery := " AND last_net NOT IN (?" + strings.Repeat(", ?", len(excludedNetworks)-1) + ") "
cond.addQuery(excludedNetworksQuery)
for _, subnet := range excludedNetworks {
cond.addArg(subnet)
}
conds.addWithStrings(
`last_net NOT IN (?`+strings.Repeat(`, ?`, len(excludedNetworks)-1)+`)`,
excludedNetworks...,
)
}
cond.addQuery(" AND last_net <> ''")
conds.add(`last_net <> ''`)
}
return cond, nil
return conds.combine(), nil
}
func assemble(partial partialQuery) query {
// partialQuery corresponds to a query
//
// distinct=false
//
// $selection WHERE $condition ORDER BY $orderBy, RANDOM() LIMIT $limit
//
// distinct=true
//
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
//
type partialQuery struct {
selection string
condition condition
distinct bool
orderBy string
limit int
}
// isEmpty returns whether the result for the query is definitely empty.
func (partial partialQuery) isEmpty() bool {
return partial.limit == 0
}
// asQuery combines partialQuery parameters into a single select query.
func (partial partialQuery) asQuery() query {
var q strings.Builder
var args []interface{}
if partial.limit == 0 {
return query{}
if partial.distinct {
// For distinct queries we need to redo randomized ordering.
fmt.Fprintf(&q, "SELECT * FROM (")
}
fmt.Fprint(&q, partial.selection)
fmt.Fprintf(&q, " WHERE %s ", partial.condition.query)
fmt.Fprint(&q, partial.selection, " WHERE ", partial.condition.query)
args = append(args, partial.condition.args...)
if partial.orderBy != "" {
@ -227,100 +264,78 @@ func assemble(partial partialQuery) query {
}
if !partial.distinct {
fmt.Fprintf(&q, " LIMIT ? ")
fmt.Fprint(&q, " LIMIT ? ")
args = append(args, partial.limit)
} else {
fmt.Fprint(&q, ") filtered ORDER BY RANDOM() LIMIT ?")
args = append(args, partial.limit)
}
return query{
query: q.String(),
args: args,
}
return query{query: q.String(), args: args}
}
func selectRandomFrom(q query, alias string, limit int) query {
if limit == 0 {
// unionAll combines multiple partial queries into a single query.
func unionAll(partials ...partialQuery) query {
var queries []string
var args []interface{}
for _, partial := range partials {
if partial.isEmpty() {
continue
}
q := partial.asQuery()
queries = append(queries, q.query)
args = append(args, q.args...)
}
if len(queries) == 0 {
return query{}
}
queryString := fmt.Sprintf("SELECT * FROM (%s) %s ORDER BY RANDOM() LIMIT ?", q.query, alias)
newArgs := make([]interface{}, len(q.args)+1)
copy(newArgs[:len(q.args)], q.args)
newArgs[len(q.args)] = limit
if len(queries) == 1 {
return query{query: queries[0], args: args}
}
return query{
query: queryString,
args: newArgs,
}
}
func isEmpty(q query) bool {
if q.query == "" && len(q.args) == 0 {
return true
}
return false
}
func union(q1, q2 query) query {
if isEmpty(q1) {
return q2
}
if isEmpty(q2) {
return q1
}
finalQuery := "(" + q1.query + ") UNION ALL (" + q2.query + ")"
var args []interface{}
args = append(args, q1.args...)
args = append(args, q2.args...)
return query{
query: finalQuery,
query: "(" + strings.Join(queries, ") UNION ALL (") + ")",
args: args,
}
}
type partialQuery struct {
selection string
condition condition
orderBy string
limit int
distinct bool
}
type condition struct {
query string
args []interface{}
}
func (cond *condition) addQuery(q string) {
cond.query += q
}
func (cond *condition) addArg(arg interface{}) {
cond.args = append(cond.args, arg)
}
type conditions []condition
func (xs *conditions) add(q string, args ...interface{}) {
*xs = append(*xs, cond(q, args...))
func (conds *conditions) add(q string, args ...interface{}) {
*conds = append(*conds, condition{query: q, args: args})
}
func cond(query string, args ...interface{}) condition {
return condition{
query: query,
args: args,
func (conds *conditions) addWithStrings(q string, args ...string) {
var values []interface{}
for _, arg := range args {
values = append(values, arg)
}
*conds = append(*conds, condition{query: q, args: values})
}
func combine(conditions ...condition) condition {
func (conds *conditions) addWithIDs(q string, args ...storj.NodeID) {
var values []interface{}
for _, arg := range args {
values = append(values, arg)
}
*conds = append(*conds, condition{query: q, args: values})
}
func (conds conditions) combine() condition {
var qs []string
var args []interface{}
for _, c := range conditions {
for _, c := range conds {
qs = append(qs, c.query)
args = append(args, c.args...)
}
return cond(strings.Join(qs, " AND "), args...)
return condition{query: " " + strings.Join(qs, " AND ") + " ", args: args}
}
type query struct {