From 6304046e80cb71f72053a4ea200ab12e984e7708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Tue, 3 Oct 2023 11:07:51 +0200 Subject: [PATCH] satellite/nodeselection: read email + wallet from db to SelectedNode NodeSelection struct is used to make decisions (and assertions) related to node selection. Usually we don't use email and wallet for placement decision, as they are not reliable. But there are cases, when we know that the email address is confirmed. Also, it can be used for upper-bound estimations (if same wallet is used for too many pieces in a segment, it's a sign of a risk, even if not all the risks can be detected with this approach, as one owner can use different wallets). Long story short: let's put wallet and email to the SelectedNode. Change-Id: I922185e3769d43eb7762b8d60d88ecd3d50991bb --- satellite/nodeselection/node.go | 2 ++ satellite/satellitedb/overlaycache.go | 32 ++++++++++++++-------- satellite/satellitedb/overlaycache_test.go | 21 ++++++++++++-- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/satellite/nodeselection/node.go b/satellite/nodeselection/node.go index ab3a3a4cb..7d53897d1 100644 --- a/satellite/nodeselection/node.go +++ b/satellite/nodeselection/node.go @@ -40,6 +40,8 @@ func (n NodeTags) FindBySignerAndName(signer storj.NodeID, name string) (NodeTag type SelectedNode struct { ID storj.NodeID Address *pb.NodeAddress + Email string + Wallet string LastNet string LastIPPort string CountryCode location.CountryCode diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index e4a0fcfea..8e489c7f3 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -64,7 +64,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele defer mon.Task()(&ctx)(&err) query := ` - SELECT id, address, last_net, last_ip_port, vetted_at, country_code, noise_proto, noise_public_key, debounce_limit, features, country_code + SELECT id, address, email, wallet, last_net, last_ip_port, vetted_at, country_code, noise_proto, noise_public_key, debounce_limit, features, country_code FROM nodes ` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.Interval()) + ` WHERE disqualified IS NULL @@ -106,10 +106,10 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele for rows.Next() { var node nodeselection.SelectedNode node.Address = &pb.NodeAddress{} - var lastIPPort sql.NullString + var lastIPPort, email, wallet sql.NullString var vettedAt *time.Time var noise noiseScanner - err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode, &noise.Proto, + err = rows.Scan(&node.ID, &node.Address.Address, &email, &wallet, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode) if err != nil { return nil, nil, err @@ -118,6 +118,8 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele node.LastIPPort = lastIPPort.String } node.Address.NoiseInfo = noise.Convert() + node.Email = email.String + node.Wallet = wallet.String // node.Exiting and node.Suspended are always false here, as we filter them out unconditionally above. // By similar logic, all nodes selected here are "online" in terms of the specified selectionCfg // (specifically, OnlineWindow). @@ -159,7 +161,7 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on defer mon.Task()(&ctx)(&err) query := ` - SELECT id, address, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code, + SELECT id, address, email, wallet, last_net, last_ip_port, noise_proto, noise_public_key, debounce_limit, features, country_code, exit_initiated_at IS NOT NULL AS exiting, (unknown_audit_suspended IS NOT NULL OR offline_suspended IS NOT NULL) AS suspended FROM nodes ` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + ` @@ -182,9 +184,9 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on for rows.Next() { var node nodeselection.SelectedNode node.Address = &pb.NodeAddress{} - var lastIPPort sql.NullString + var lastIPPort, email, wallet sql.NullString var noise noiseScanner - err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &noise.Proto, + err = rows.Scan(&node.ID, &node.Address.Address, &node.Email, &node.Wallet, &node.LastNet, &lastIPPort, &noise.Proto, &noise.PublicKey, &node.Address.DebounceLimit, &node.Address.Features, &node.CountryCode, &node.Exiting, &node.Suspended) if err != nil { @@ -194,6 +196,8 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on node.LastIPPort = lastIPPort.String } node.Address.NoiseInfo = noise.Convert() + node.Email = email.String + node.Wallet = wallet.String // we consider all nodes in the download selection cache to be online. node.Online = true nodes = append(nodes, &node) @@ -407,7 +411,7 @@ func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDLis } err = withRows(cache.db.Query(ctx, ` - SELECT n.id, n.address, n.last_net, n.last_ip_port, n.country_code, + SELECT n.id, n.address, n.email, n.wallet, n.last_net, n.last_ip_port, n.country_code, n.last_contact_success > $2 AS online, (n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended, n.disqualified IS NOT NULL AS disqualified, @@ -456,7 +460,7 @@ func (cache *overlaycache) GetParticipatingNodes(ctx context.Context, onlineWind var nodes []*nodeselection.SelectedNode err = withRows(cache.db.Query(ctx, ` - SELECT id, address, last_net, last_ip_port, country_code, + SELECT id, address, email, wallet, last_net, last_ip_port, country_code, last_contact_success > $1 AS online, (offline_suspended IS NOT NULL OR unknown_audit_suspended IS NOT NULL) AS suspended, false AS disqualified, @@ -519,9 +523,9 @@ func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) { var node nodeselection.SelectedNode node.Address = &pb.NodeAddress{} var nodeID nullNodeID - var address, lastNet, lastIPPort, countryCode sql.NullString + var address, email, wallet, lastNet, lastIPPort, countryCode sql.NullString var online, suspended, disqualified, exiting, exited sql.NullBool - err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode, + err := rows.Scan(&nodeID, &address, &email, &wallet, &lastNet, &lastIPPort, &countryCode, &online, &suspended, &disqualified, &exiting, &exited) if err != nil { return nodeselection.SelectedNode{}, err @@ -539,6 +543,8 @@ func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) { } node.ID = nodeID.NodeID node.Address.Address = address.String + node.Email = email.String + node.Wallet = wallet.String node.LastNet = lastNet.String if lastIPPort.Valid { node.LastIPPort = lastIPPort.String @@ -556,7 +562,7 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node var node nodeselection.SelectedNode node.Address = &pb.NodeAddress{} var nodeID nullNodeID - var address, lastNet, lastIPPort, countryCode sql.NullString + var address, wallet, email, lastNet, lastIPPort, countryCode sql.NullString var online, suspended, disqualified, exiting, exited sql.NullBool var tag nodeselection.NodeTag @@ -564,7 +570,7 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node signedAt := &time.Time{} signer := nullNodeID{} - err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode, + err := rows.Scan(&nodeID, &address, &email, &wallet, &lastNet, &lastIPPort, &countryCode, &online, &suspended, &disqualified, &exiting, &exited, &name, &tag.Value, &signedAt, &signer) if err != nil { return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, err @@ -582,6 +588,8 @@ func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, node } node.ID = nodeID.NodeID node.Address.Address = address.String + node.Email = email.String + node.Wallet = wallet.String node.LastNet = lastNet.String if lastIPPort.Valid { node.LastIPPort = lastIPPort.String diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index 7ae99dc26..a8fcbe328 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -453,6 +453,8 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) { type nodeDisposition struct { id storj.NodeID address string + email string + wallet string lastIPPort string offlineInterval time.Duration countryCode location.CountryCode @@ -568,8 +570,12 @@ func TestOverlayCache_GetNodes(t *testing.T) { for i := range allNodes { allIDs[i] = allNodes[i].id } - _, err = cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond) + + selection, err := cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond) require.NoError(t, err) + + require.Equal(t, "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0", selection[0].Wallet) + require.Equal(t, "test@storj.io", selection[0].Email) }) } @@ -625,8 +631,11 @@ func TestOverlayCache_GetParticipatingNodes(t *testing.T) { } // test as of system time - _, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond) + selection, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond) require.NoError(t, err) + + require.Equal(t, "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0", selection[0].Wallet) + require.Equal(t, "test@storj.io", selection[0].Email) }) } @@ -637,6 +646,8 @@ func nodeDispositionToSelectedNode(disp nodeDisposition, onlineWindow time.Durat return nodeselection.SelectedNode{ ID: disp.id, Address: &pb.NodeAddress{Address: disp.address}, + Email: disp.email, + Wallet: disp.wallet, LastNet: disp.lastIPPort, LastIPPort: disp.lastIPPort, CountryCode: disp.countryCode, @@ -658,6 +669,8 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI offlineSuspended: offlineSuspended, exiting: exiting, exited: exited, + email: "test@storj.io", + wallet: "0x9b7488BF8b6A4FF21D610e3dd202723f705cD1C0", } checkInInfo := overlay.NodeCheckInInfo{ @@ -668,6 +681,10 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI LastNet: disp.lastIPPort, CountryCode: disp.countryCode, Version: &pb.NodeVersion{Version: "v0.0.0"}, + Operator: &pb.NodeOperator{ + Email: disp.email, + Wallet: disp.wallet, + }, } timestamp := time.Now().UTC().Add(-disp.offlineInterval)