satellite/nodeselection: expand SelectedNode

In the repair subsystem, it is necessary to acquire several extra
properties of nodes that are holding pieces of things or may be
selected to hold pieces. We need to know if a node is 'online' (the
definition of "online" may change somewhat depending on the situation),
if a node is in the process of graceful exit, and whether a node is
suspended. We can't just filter out nodes with all of these properties,
because sometimes we need to know properties about nodes even when the
nodes are suspended or gracefully exiting.

I thought the best way to do this was to add fields to SelectedNode,
and (to avoid any confusion) arrange for the added fields to be
populated wherever SelectedNode is returned, whether or not the new
fields are necessarily going to be used.

If people would rather I use a separate type from SelectedNode, I can do
that instead.

Change-Id: I7804a0e0a15cfe34c8ff47a227175ea5862a4ebc
This commit is contained in:
paul cannon 2023-07-31 17:18:32 -05:00 committed by Michał Niewrzał
parent 0b02a48a10
commit 6e46a926bb
4 changed files with 56 additions and 39 deletions

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/zeebo/errs"
"golang.org/x/exp/slices"
"storj.io/common/pb"
"storj.io/common/storj"
@ -42,28 +43,16 @@ type SelectedNode struct {
LastNet string
LastIPPort string
CountryCode location.CountryCode
Exiting bool
Suspended bool
Online bool
Tags NodeTags
}
// Clone returns a deep clone of the selected node.
func (node *SelectedNode) Clone() *SelectedNode {
copy := pb.CopyNode(&pb.Node{Id: node.ID, Address: node.Address})
tags := make([]NodeTag, len(node.Tags))
for ix, tag := range node.Tags {
tags[ix] = NodeTag{
NodeID: tag.NodeID,
SignedAt: tag.SignedAt,
Signer: tag.Signer,
Name: tag.Name,
Value: tag.Value,
}
}
return &SelectedNode{
ID: copy.Id,
Address: copy.Address,
LastNet: node.LastNet,
LastIPPort: node.LastIPPort,
CountryCode: node.CountryCode,
Tags: tags,
}
newNode := *node
newNode.Address = pb.CopyNodeAddress(node.Address)
newNode.Tags = slices.Clone(node.Tags)
return &newNode
}

View File

@ -104,14 +104,14 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, false FROM nodes`,
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, country_code, noise_proto, noise_public_key, debounce_limit, features, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, true FROM nodes`,
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, country_code, noise_proto, noise_public_key, debounce_limit, features, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
@ -134,7 +134,7 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
var isNew bool
var noise noiseScanner
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &isNew)
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &node.CountryCode, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &isNew)
if err != nil {
return nil, nil, err
}
@ -149,6 +149,9 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
} else {
reputableNodes = append(reputableNodes, &node)
}
// node.Exiting and node.Suspended are always false here, as we filter those nodes out unconditionally in nodeSelectionCondition.
// By similar logic, all nodes selected here are "online" in terms of the specified criteria (specifically, OnlineWindow).
node.Online = true
if len(newNodes) >= newNodeCount && len(reputableNodes) >= reputableNodeCount {
break

View File

@ -118,6 +118,10 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
node.LastIPPort = lastIPPort.String
}
node.Address.NoiseInfo = noise.Convert()
// node.Exiting and node.Suspended are always false here, as we filter them out unconditionally above.
// By similar logic, all nodes selected here are "online" in terms of the specified selectionCfg
// (specifically, OnlineWindow).
node.Online = true
if vettedAt == nil {
newNodes = append(newNodes, &node)
@ -155,7 +159,8 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
defer mon.Task()(&ctx)(&err)
query := `
SELECT id, address, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code
SELECT id, address, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code,
exit_initiated_at IS NOT NULL AS exiting, (unknown_audit_suspended IS NOT NULL OR offline_suspended IS NOT NULL) AS suspended
FROM nodes
` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + `
WHERE disqualified IS NULL
@ -180,7 +185,8 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
var lastIPPort sql.NullString
var noise noiseScanner
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &noise.Proto,
&noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode)
&noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode,
&node.Exiting, &node.Suspended)
if err != nil {
return nil, err
}
@ -188,6 +194,8 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
node.LastIPPort = lastIPPort.String
}
node.Address.NoiseInfo = noise.Convert()
// we consider all nodes in the download selection cache to be online.
node.Online = true
nodes = append(nodes, &node)
}
return nodes, Error.Wrap(rows.Err())
@ -410,7 +418,7 @@ func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.Node
}
err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online, exit_initiated_at IS NOT NULL as exiting
FROM nodes
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
WHERE id = any($1::bytea[])
@ -421,12 +429,12 @@ func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.Node
`, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
))(func(rows tagsql.Rows) error {
for rows.Next() {
node, onlineNode, err := scanSelectedNode(rows)
node, err := scanSelectedNode(rows)
if err != nil {
return err
}
if onlineNode {
if node.Online {
online = append(online, node)
} else {
offline = append(offline, node)
@ -458,7 +466,7 @@ func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSyste
defer mon.Task()(&ctx)(&err)
err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $1 as online
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $1 as online, exit_initiated_at IS NOT NULL as exiting
FROM nodes
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
WHERE disqualified IS NULL
@ -468,12 +476,12 @@ func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSyste
`, time.Now().Add(-onlineWindow),
))(func(rows tagsql.Rows) error {
for rows.Next() {
node, onlineNode, err := scanSelectedNode(rows)
node, err := scanSelectedNode(rows)
if err != nil {
return err
}
if onlineNode {
if node.Online {
online = append(online, node)
} else {
offline = append(offline, node)
@ -485,20 +493,23 @@ func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSyste
return online, offline, Error.Wrap(err)
}
func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, bool, error) {
var onlineNode bool
func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) {
var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
err := rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &onlineNode)
err := rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &node.Online, &node.Exiting)
if err != nil {
return nodeselection.SelectedNode{}, false, err
return nodeselection.SelectedNode{}, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
return node, onlineNode, nil
// node.Suspended is always false for now, but that will change in a coming
// commit; we need to include suspended nodes in return values from
// Reliable() and KnownReliable() (in case they are in excluded countries,
// are out of placement, are on clumped IP networks, etc).
return node, nil
}
// UpdateReputation updates the DB columns for any of the reputation fields in ReputationUpdate.
@ -1407,13 +1418,16 @@ func (cache *overlaycache) TestNodeCountryCode(ctx context.Context, nodeID storj
}
// IterateAllContactedNodes will call cb on all known nodes (used in restore trash contexts).
//
// Note that this may include disqualified nodes!
func (cache *overlaycache) IterateAllContactedNodes(ctx context.Context, cb func(context.Context, *nodeselection.SelectedNode) error) (err error) {
defer mon.Task()(&ctx)(&err)
var rows tagsql.Rows
// 2018-04-06 is the date of the first storj v3 commit.
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, debounce_limit, features
SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code,
exit_initiated_at IS NOT NULL AS exiting, (unknown_audit_suspended IS NOT NULL OR offline_suspended IS NOT NULL) AS suspended
FROM nodes
WHERE last_contact_success >= timestamp '2018-04-06'
`))
@ -1428,7 +1442,7 @@ func (cache *overlaycache) IterateAllContactedNodes(ctx context.Context, cb func
var lastIPPort sql.NullString
var noise noiseScanner
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features)
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode, &node.Exiting, &node.Suspended)
if err != nil {
return Error.Wrap(err)
}

View File

@ -570,6 +570,13 @@ func TestOverlayCache_Reliable(t *testing.T) {
} {
online, offline, err := cache.Reliable(ctx, tc.OnlineWindow, 0)
require.NoError(t, err)
// make the .Online attribute match expectations for this OnlineWindow
for n := range tc.Online {
tc.Online[n].Online = true
}
for n := range tc.Offline {
tc.Offline[n].Online = false
}
require.ElementsMatch(t, tc.Online, online, "#%d", i)
require.ElementsMatch(t, tc.Offline, offline, "#%d", i)
}
@ -580,13 +587,14 @@ func TestOverlayCache_Reliable(t *testing.T) {
})
}
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqalified, auditSuspended, offlineSuspended, exited bool) nodeselection.SelectedNode {
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqualified, auditSuspended, offlineSuspended, exited bool) nodeselection.SelectedNode {
selectedNode := nodeselection.SelectedNode{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: address},
LastNet: lastIPPort,
LastIPPort: lastIPPort,
CountryCode: location.Poland,
Online: online,
}
checkInInfo := overlay.NodeCheckInInfo{
@ -607,17 +615,19 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI
err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{})
require.NoError(t, err)
if disqalified {
if disqualified {
_, err := cache.DisqualifyNode(ctx, selectedNode.ID, time.Now(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)
}
if auditSuspended {
require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, selectedNode.ID, time.Now()))
selectedNode.Suspended = true
}
if offlineSuspended {
require.NoError(t, cache.TestSuspendNodeOffline(ctx, selectedNode.ID, time.Now()))
selectedNode.Suspended = true
}
if exited {
@ -629,6 +639,7 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI
ExitFinishedAt: now,
ExitSuccess: true,
})
selectedNode.Exiting = true
require.NoError(t, err)
}