satellite/nodeselection: read email + wallet from db to SelectedNode

NodeSelection struct is used to make decisions (and assertions) related to node selection.

Usually we don't use email and wallet for placement decision, as they are not reliable.

But there are cases, when we know that the email address is confirmed. Also, it can be used for upper-bound estimations (if same wallet is used for too many pieces in a segment, it's a sign of a risk, even if not all the risks can be detected with this approach, as one owner can use different wallets).

Long story short: let's put wallet and email to the SelectedNode.

Change-Id: I922185e3769d43eb7762b8d60d88ecd3d50991bb
This commit is contained in:
Márton Elek 2023-10-03 11:07:51 +02:00 committed by Storj Robot
parent a06735c1b6
commit 6304046e80
3 changed files with 41 additions and 14 deletions

View File

@ -40,6 +40,8 @@ func (n NodeTags) FindBySignerAndName(signer storj.NodeID, name string) (NodeTag
type SelectedNode struct { type SelectedNode struct {
ID storj.NodeID ID storj.NodeID
Address *pb.NodeAddress Address *pb.NodeAddress
Email string
Wallet string
LastNet string LastNet string
LastIPPort string LastIPPort string
CountryCode location.CountryCode CountryCode location.CountryCode

View File

@ -64,7 +64,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := ` query := `
SELECT id, address, last_net, last_ip_port, vetted_at, country_code, noise_proto, noise_public_key, debounce_limit, features, country_code SELECT id, address, email, wallet, last_net, last_ip_port, vetted_at, country_code, noise_proto, noise_public_key, debounce_limit, features, country_code
FROM nodes FROM nodes
` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.Interval()) + ` ` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.Interval()) + `
WHERE disqualified IS NULL WHERE disqualified IS NULL
@ -106,10 +106,10 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
for rows.Next() { for rows.Next() {
var node nodeselection.SelectedNode var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{} node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString var lastIPPort, email, wallet sql.NullString
var vettedAt *time.Time var vettedAt *time.Time
var noise noiseScanner var noise noiseScanner
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode, &noise.Proto, err = rows.Scan(&node.ID, &node.Address.Address, &email, &wallet, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode, &noise.Proto,
&noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode) &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -118,6 +118,8 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
node.LastIPPort = lastIPPort.String node.LastIPPort = lastIPPort.String
} }
node.Address.NoiseInfo = noise.Convert() node.Address.NoiseInfo = noise.Convert()
node.Email = email.String
node.Wallet = wallet.String
// node.Exiting and node.Suspended are always false here, as we filter them out unconditionally above. // node.Exiting and node.Suspended are always false here, as we filter them out unconditionally above.
// By similar logic, all nodes selected here are "online" in terms of the specified selectionCfg // By similar logic, all nodes selected here are "online" in terms of the specified selectionCfg
// (specifically, OnlineWindow). // (specifically, OnlineWindow).
@ -159,7 +161,7 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := ` query := `
SELECT id, address, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code, SELECT id, address, email, wallet, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code,
exit_initiated_at IS NOT NULL AS exiting, (unknown_audit_suspended IS NOT NULL OR offline_suspended IS NOT NULL) AS suspended exit_initiated_at IS NOT NULL AS exiting, (unknown_audit_suspended IS NOT NULL OR offline_suspended IS NOT NULL) AS suspended
FROM nodes FROM nodes
` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + ` ` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + `
@ -182,9 +184,9 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
for rows.Next() { for rows.Next() {
var node nodeselection.SelectedNode var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{} node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString var lastIPPort, email, wallet sql.NullString
var noise noiseScanner var noise noiseScanner
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &noise.Proto, err = rows.Scan(&node.ID, &node.Address.Address, &node.Email, &node.Wallet, &node.LastNet, &lastIPPort, &noise.Proto,
&noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode,
&node.Exiting, &node.Suspended) &node.Exiting, &node.Suspended)
if err != nil { if err != nil {
@ -194,6 +196,8 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on
node.LastIPPort = lastIPPort.String node.LastIPPort = lastIPPort.String
} }
node.Address.NoiseInfo = noise.Convert() node.Address.NoiseInfo = noise.Convert()
node.Email = email.String
node.Wallet = wallet.String
// we consider all nodes in the download selection cache to be online. // we consider all nodes in the download selection cache to be online.
node.Online = true node.Online = true
nodes = append(nodes, &node) nodes = append(nodes, &node)
@ -407,7 +411,7 @@ func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDLis
} }
err = withRows(cache.db.Query(ctx, ` err = withRows(cache.db.Query(ctx, `
SELECT n.id, n.address, n.last_net, n.last_ip_port, n.country_code, SELECT n.id, n.address, n.email, n.wallet, n.last_net, n.last_ip_port, n.country_code,
n.last_contact_success > $2 AS online, n.last_contact_success > $2 AS online,
(n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended, (n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended,
n.disqualified IS NOT NULL AS disqualified, n.disqualified IS NOT NULL AS disqualified,
@ -456,7 +460,7 @@ func (cache *overlaycache) GetParticipatingNodes(ctx context.Context, onlineWind
var nodes []*nodeselection.SelectedNode var nodes []*nodeselection.SelectedNode
err = withRows(cache.db.Query(ctx, ` err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, SELECT id, address, email, wallet, last_net, last_ip_port, country_code,
last_contact_success > $1 AS online, last_contact_success > $1 AS online,
(offline_suspended IS NOT NULL OR unknown_audit_suspended IS NOT NULL) AS suspended, (offline_suspended IS NOT NULL OR unknown_audit_suspended IS NOT NULL) AS suspended,
false AS disqualified, false AS disqualified,
@ -519,9 +523,9 @@ func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) {
var node nodeselection.SelectedNode var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{} node.Address = &pb.NodeAddress{}
var nodeID nullNodeID var nodeID nullNodeID
var address, lastNet, lastIPPort, countryCode sql.NullString var address, email, wallet, lastNet, lastIPPort, countryCode sql.NullString
var online, suspended, disqualified, exiting, exited sql.NullBool var online, suspended, disqualified, exiting, exited sql.NullBool
err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode, err := rows.Scan(&nodeID, &address, &email, &wallet, &lastNet, &lastIPPort, &countryCode,
&online, &suspended, &disqualified, &exiting, &exited) &online, &suspended, &disqualified, &exiting, &exited)
if err != nil { if err != nil {
return nodeselection.SelectedNode{}, err return nodeselection.SelectedNode{}, err
@ -539,6 +543,8 @@ func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) {
} }
node.ID = nodeID.NodeID node.ID = nodeID.NodeID
node.Address.Address = address.String node.Address.Address = address.String
node.Email = email.String
node.Wallet = wallet.String
node.LastNet = lastNet.String node.LastNet = lastNet.String
if lastIPPort.Valid { if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String node.LastIPPort = lastIPPort.String
@ -556,7 +562,7 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node
var node nodeselection.SelectedNode var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{} node.Address = &pb.NodeAddress{}
var nodeID nullNodeID var nodeID nullNodeID
var address, lastNet, lastIPPort, countryCode sql.NullString var address, wallet, email, lastNet, lastIPPort, countryCode sql.NullString
var online, suspended, disqualified, exiting, exited sql.NullBool var online, suspended, disqualified, exiting, exited sql.NullBool
var tag nodeselection.NodeTag var tag nodeselection.NodeTag
@ -564,7 +570,7 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node
signedAt := &time.Time{} signedAt := &time.Time{}
signer := nullNodeID{} signer := nullNodeID{}
err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode, err := rows.Scan(&nodeID, &address, &email, &wallet, &lastNet, &lastIPPort, &countryCode,
&online, &suspended, &disqualified, &exiting, &exited, &name, &tag.Value, &signedAt, &signer) &online, &suspended, &disqualified, &exiting, &exited, &name, &tag.Value, &signedAt, &signer)
if err != nil { if err != nil {
return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, err return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, err
@ -582,6 +588,8 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node
} }
node.ID = nodeID.NodeID node.ID = nodeID.NodeID
node.Address.Address = address.String node.Address.Address = address.String
node.Email = email.String
node.Wallet = wallet.String
node.LastNet = lastNet.String node.LastNet = lastNet.String
if lastIPPort.Valid { if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String node.LastIPPort = lastIPPort.String

View File

@ -453,6 +453,8 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) {
type nodeDisposition struct { type nodeDisposition struct {
id storj.NodeID id storj.NodeID
address string address string
email string
wallet string
lastIPPort string lastIPPort string
offlineInterval time.Duration offlineInterval time.Duration
countryCode location.CountryCode countryCode location.CountryCode
@ -568,8 +570,12 @@ func TestOverlayCache_GetNodes(t *testing.T) {
for i := range allNodes { for i := range allNodes {
allIDs[i] = allNodes[i].id allIDs[i] = allNodes[i].id
} }
_, err = cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond)
selection, err := cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0", selection[0].Wallet)
require.Equal(t, "test@storj.io", selection[0].Email)
}) })
} }
@ -625,8 +631,11 @@ func TestOverlayCache_GetParticipatingNodes(t *testing.T) {
} }
// test as of system time // test as of system time
_, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond) selection, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0", selection[0].Wallet)
require.Equal(t, "test@storj.io", selection[0].Email)
}) })
} }
@ -637,6 +646,8 @@ func nodeDispositionToSelectedNode(disp nodeDisposition, onlineWindow time.Durat
return nodeselection.SelectedNode{ return nodeselection.SelectedNode{
ID: disp.id, ID: disp.id,
Address: &pb.NodeAddress{Address: disp.address}, Address: &pb.NodeAddress{Address: disp.address},
Email: disp.email,
Wallet: disp.wallet,
LastNet: disp.lastIPPort, LastNet: disp.lastIPPort,
LastIPPort: disp.lastIPPort, LastIPPort: disp.lastIPPort,
CountryCode: disp.countryCode, CountryCode: disp.countryCode,
@ -658,6 +669,8 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI
offlineSuspended: offlineSuspended, offlineSuspended: offlineSuspended,
exiting: exiting, exiting: exiting,
exited: exited, exited: exited,
email: "test@storj.io",
wallet: "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0",
} }
checkInInfo := overlay.NodeCheckInInfo{ checkInInfo := overlay.NodeCheckInInfo{
@ -668,6 +681,10 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI
LastNet: disp.lastIPPort, LastNet: disp.lastIPPort,
CountryCode: disp.countryCode, CountryCode: disp.countryCode,
Version: &pb.NodeVersion{Version: "v0.0.0"}, Version: &pb.NodeVersion{Version: "v0.0.0"},
Operator: &pb.NodeOperator{
Email: disp.email,
Wallet: disp.wallet,
},
} }
timestamp := time.Now().UTC().Add(-disp.offlineInterval) timestamp := time.Now().UTC().Add(-disp.offlineInterval)