From 686faeedbd4181404921efeb76521c58da100f40 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Mon, 2 Jan 2023 11:10:47 -0500 Subject: [PATCH] satellite/overlay: return noise info with selected nodes we have two more fields in the database (noise_proto and noise_public_key) that now need to go into pb.NodeAddress when returning AddressedOrderLimits. the only real complication is making sure type conversions between database types and NodeURLs and so on don't lose this new pb.NodeAddress field (NoiseInfo). otherwise this is a relatively straightforward commit Change-Id: I45b59d7b2d3ae21c2e6eb95497f07cd388d454b3 --- satellite/orders/service.go | 69 +++++++++++--------------- satellite/orders/signer.go | 12 ++--- satellite/orders/signer_test.go | 13 +++-- satellite/overlay/service.go | 7 ++- satellite/overlay/service_test.go | 2 - satellite/overlay/uploadselection.go | 27 +++++++--- satellite/satellitedb/nodeselection.go | 12 +++-- satellite/satellitedb/overlaycache.go | 49 +++++++++++++----- 8 files changed, 111 insertions(+), 80 deletions(-) diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 3ef583d3f..1342089c1 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -153,15 +153,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas continue } - address := node.Address.Address - if node.LastIPPort != "" { - address = node.LastIPPort - } - - _, err := signer.Sign(ctx, storj.NodeURL{ - ID: piece.StorageNode, - Address: address, - }, int32(piece.Number)) + _, err := signer.Sign(ctx, resolveStorageNode_Selected(node, true), int32(piece.Number)) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -235,8 +227,7 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas } for pieceNum, node := range nodes { - address := storageNodeAddress(node) - _, err := signer.Sign(ctx, storj.NodeURL{ID: node.ID, Address: address}, int32(pieceNum)) + _, err := signer.Sign(ctx, resolveStorageNode_Selected(node, true), int32(pieceNum)) if err != nil { return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -269,7 +260,7 @@ func (service *Service) ReplacePutOrderLimits(ctx context.Context, rootPieceID s if err != nil { return nil, ErrSigner.Wrap(err) } - newAddressedLimits[pieceNumber].StorageNodeAddress.Address = storageNodeAddress(nodes[i]) + newAddressedLimits[pieceNumber].StorageNodeAddress = resolveStorageNode_Selected(nodes[i], true).Address } return newAddressedLimits, nil @@ -310,13 +301,9 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment meta continue } - address := node.Address.Address cachedNodesInfo[piece.StorageNode] = *node - limit, err := signer.Sign(ctx, storj.NodeURL{ - ID: piece.StorageNode, - Address: address, - }, int32(piece.Number)) + limit, err := signer.Sign(ctx, resolveStorageNode_Reputation(node), int32(piece.Number)) if err != nil { return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } @@ -387,10 +374,7 @@ func (service *Service) createAuditOrderLimitWithSigner(ctx context.Context, nod return nil, storj.PiecePrivateKey{}, nodeInfo, overlay.ErrNodeOffline.New("%v", nodeID) } - orderLimit, err := signer.Sign(ctx, storj.NodeURL{ - ID: nodeID, - Address: node.Address.Address, - }, int32(pieceNum)) + orderLimit, err := signer.Sign(ctx, resolveStorageNode(&node.Node, node.LastIPPort, false), int32(pieceNum)) if err != nil { return nil, storj.PiecePrivateKey{}, nodeInfo, Error.Wrap(err) } @@ -443,10 +427,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m cachedNodesInfo[piece.StorageNode] = *node - limit, err := signer.Sign(ctx, storj.NodeURL{ - ID: piece.StorageNode, - Address: node.Address.Address, - }, int32(piece.Number)) + limit, err := signer.Sign(ctx, resolveStorageNode_Reputation(node), int32(piece.Number)) if err != nil { return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } @@ -512,10 +493,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces) } - limit, err := signer.Sign(ctx, storj.NodeURL{ - ID: node.ID, - Address: node.Address.Address, - }, pieceNum) + limit, err := signer.Sign(ctx, resolveStorageNode_Selected(node, false), pieceNum) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -553,12 +531,7 @@ func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, buc return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } - address := node.Address.Address - if node.LastIPPort != "" { - address = node.LastIPPort - } - nodeURL := storj.NodeURL{ID: nodeID, Address: address} - limit, err = signer.Sign(ctx, nodeURL, pieceNum) + limit, err = signer.Sign(ctx, resolveStorageNode(&node.Node, node.LastIPPort, true), pieceNum) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -605,10 +578,24 @@ func (service *Service) DecryptOrderMetadata(ctx context.Context, order *pb.Orde return key.DecryptMetadata(order.SerialNumber, order.EncryptedMetadata) } -func storageNodeAddress(node *overlay.SelectedNode) string { - address := node.Address.Address - if node.LastIPPort != "" { - address = node.LastIPPort - } - return address +func resolveStorageNode_Selected(node *overlay.SelectedNode, resolveDNS bool) *pb.Node { + return resolveStorageNode(&pb.Node{ + Id: node.ID, + Address: node.Address, + }, node.LastIPPort, resolveDNS) +} + +func resolveStorageNode_Reputation(node *overlay.NodeReputation) *pb.Node { + return resolveStorageNode(&pb.Node{ + Id: node.ID, + Address: node.Address, + }, node.LastIPPort, false) +} + +func resolveStorageNode(node *pb.Node, lastIPPort string, resolveDNS bool) *pb.Node { + if resolveDNS && lastIPPort != "" { + node = pb.CopyNode(node) // we mutate + node.Address.Address = lastIPPort + } + return node } diff --git a/satellite/orders/signer.go b/satellite/orders/signer.go index 75aa1b768..25629889f 100644 --- a/satellite/orders/signer.go +++ b/satellite/orders/signer.go @@ -140,7 +140,7 @@ func NewSignerGracefulExit(service *Service, rootPieceID storj.PieceID, orderCre } // Sign signs an order limit for the specified node. -func (signer *Signer) Sign(ctx context.Context, node storj.NodeURL, pieceNum int32) (_ *pb.AddressedOrderLimit, err error) { +func (signer *Signer) Sign(ctx context.Context, node *pb.Node, pieceNum int32) (_ *pb.AddressedOrderLimit, err error) { defer mon.Task()(&ctx)(&err) if len(signer.EncryptedMetadata) == 0 { @@ -166,9 +166,9 @@ func (signer *Signer) Sign(ctx context.Context, node storj.NodeURL, pieceNum int SerialNumber: signer.Serial, SatelliteId: signer.Service.satellite.ID(), UplinkPublicKey: signer.PublicKey, - StorageNodeId: node.ID, + StorageNodeId: node.Id, - PieceId: signer.rootPieceIDDeriver.Derive(node.ID, pieceNum), + PieceId: signer.rootPieceIDDeriver.Derive(node.Id, pieceNum), Limit: signer.Limit, Action: signer.Action, @@ -186,10 +186,8 @@ func (signer *Signer) Sign(ctx context.Context, node storj.NodeURL, pieceNum int } addressedLimit := &pb.AddressedOrderLimit{ - Limit: signedLimit, - StorageNodeAddress: &pb.NodeAddress{ - Address: node.Address, - }, + Limit: signedLimit, + StorageNodeAddress: node.Address, } signer.AddressedLimits = append(signer.AddressedLimits, addressedLimit) diff --git a/satellite/orders/signer_test.go b/satellite/orders/signer_test.go index 92dc62f25..09c45e1ed 100644 --- a/satellite/orders/signer_test.go +++ b/satellite/orders/signer_test.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "storj.io/common/memory" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -56,9 +57,14 @@ func TestSigner_EncryptedMetadata(t *testing.T) { signer, err := orders.NewSignerGet(satellite.Orders.Service, root, orderCreation, 1e6, bucketLocation) require.NoError(t, err) - addressedLimit, err := signer.Sign(ctx, storj.NodeURL{ - ID: storagenode.ID(), - Address: storagenode.Addr(), + addressedLimit, err := signer.Sign(ctx, &pb.Node{ + Id: storagenode.ID(), + Address: &pb.NodeAddress{ + Address: storagenode.Addr(), + NoiseInfo: &pb.NoiseInfo{ + PublicKey: []byte("testpublickey"), + }, + }, }, 1) require.NoError(t, err) @@ -66,6 +72,7 @@ func TestSigner_EncryptedMetadata(t *testing.T) { require.NotEmpty(t, addressedLimit.Limit.EncryptedMetadataKeyId) require.Equal(t, ekeys.Default.ID[:], addressedLimit.Limit.EncryptedMetadataKeyId) + require.Equal(t, addressedLimit.StorageNodeAddress.NoiseInfo.PublicKey, []byte("testpublickey")) metadata, err := ekeys.Default.DecryptMetadata(addressedLimit.Limit.SerialNumber, addressedLimit.Limit.EncryptedMetadata) require.NoError(t, err) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 5fbdd84b8..32a68c681 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -290,11 +290,10 @@ type NodeReputation struct { // Clone returns a deep clone of the selected node. func (node *SelectedNode) Clone() *SelectedNode { + copy := pb.CopyNode(&pb.Node{Id: node.ID, Address: node.Address}) return &SelectedNode{ - ID: node.ID, - Address: &pb.NodeAddress{ - Address: node.Address.Address, - }, + ID: copy.Id, + Address: copy.Address, LastNet: node.LastNet, LastIPPort: node.LastIPPort, } diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 557c0c6a5..4ad69a2ec 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -420,8 +420,6 @@ func TestGetOnlineNodesForGetDelete(t *testing.T) { LastNet: dossier.LastNet, LastIPPort: dossier.LastIPPort, } - // TODO(jt): bring this back in the next patchset - expectedNodes[dossier.Id].Address.NoiseInfo = nil } // add a fake node ID to make sure GetOnlineNodesForGetDelete doesn't error and still returns the expected nodes. nodeIDs[len(planet.StorageNodes)] = testrand.NodeID() diff --git a/satellite/overlay/uploadselection.go b/satellite/overlay/uploadselection.go index 087f4723a..c6462f423 100644 --- a/satellite/overlay/uploadselection.go +++ b/satellite/overlay/uploadselection.go @@ -122,9 +122,18 @@ func (cache *UploadSelectionCache) Size(ctx context.Context) (reputableNodeCount func convNodesToSelectedNodes(nodes []*uploadselection.Node) (xs []*SelectedNode) { for _, n := range nodes { + var noiseInfo *pb.NoiseInfo + if n.NoiseInfo.PublicKey != "" && n.NoiseInfo.Proto != storj.NoiseProto_Unset { + // TODO(jt): storj/common's NoiseInfoConvert or NodeFromNodeURL should + // handle this empty case. + noiseInfo = pb.NoiseInfoConvert(n.NoiseInfo) + } xs = append(xs, &SelectedNode{ - ID: n.ID, - Address: &pb.NodeAddress{Address: n.Address}, + ID: n.ID, + Address: &pb.NodeAddress{ + Address: n.Address, + NoiseInfo: noiseInfo, + }, LastNet: n.LastNet, LastIPPort: n.LastIPPort, CountryCode: n.CountryCode, @@ -135,11 +144,17 @@ func convNodesToSelectedNodes(nodes []*uploadselection.Node) (xs []*SelectedNode func convSelectedNodesToNodes(nodes []*SelectedNode) (xs []*uploadselection.Node) { for _, n := range nodes { + nodeurl := storj.NodeURL{ + ID: n.ID, + Address: n.Address.Address, + } + if n.Address.NoiseInfo != nil { + // TODO(jt): storj/common's (*pb.Node).NodeURL() should + // handle this if statement. + nodeurl.NoiseInfo = n.Address.NoiseInfo.Convert() + } xs = append(xs, &uploadselection.Node{ - NodeURL: storj.NodeURL{ - ID: n.ID, - Address: n.Address.Address, - }, + NodeURL: nodeurl, LastNet: n.LastNet, LastIPPort: n.LastIPPort, CountryCode: n.CountryCode, diff --git a/satellite/satellitedb/nodeselection.go b/satellite/satellitedb/nodeselection.go index 374041a31..35e05c889 100644 --- a/satellite/satellitedb/nodeselection.go +++ b/satellite/satellitedb/nodeselection.go @@ -108,25 +108,25 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable // Later, the flag allows us to distinguish if a node is new when scanning the db rows. if !criteria.DistinctIP { reputableNodeQuery = partialQuery{ - selection: `SELECT last_net, id, address, last_ip_port, false FROM nodes`, + selection: `SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`, condition: reputableNodesCondition, limit: reputableNodeCount, } newNodeQuery = partialQuery{ - selection: `SELECT last_net, id, address, last_ip_port, true FROM nodes`, + selection: `SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`, condition: newNodesCondition, limit: newNodeCount, } } else { reputableNodeQuery = partialQuery{ - selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`, + selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, false FROM nodes`, condition: reputableNodesCondition, distinct: true, limit: reputableNodeCount, orderBy: "last_net", } newNodeQuery = partialQuery{ - selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`, + selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, noise_proto, noise_public_key, true FROM nodes`, condition: newNodesCondition, distinct: true, limit: newNodeCount, @@ -148,8 +148,9 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString var isNew bool + var noise noiseScanner - err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &isNew) + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort, &noise.Proto, &noise.PublicKey, &isNew) if err != nil { return nil, nil, err } @@ -157,6 +158,7 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() if isNew { newNodes = append(newNodes, &node) diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 4a52939bb..b07bc0762 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -57,7 +57,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele defer mon.Task()(&ctx)(&err) query := ` - SELECT id, address, last_net, last_ip_port, vetted_at, country_code + SELECT id, address, last_net, last_ip_port, vetted_at, country_code, noise_proto, noise_public_key FROM nodes ` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.Interval()) + ` WHERE disqualified IS NULL @@ -101,13 +101,15 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString var vettedAt *time.Time - err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode) + var noise noiseScanner + err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt, &node.CountryCode, &noise.Proto, &noise.PublicKey) if err != nil { return nil, nil, err } if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() if vettedAt == nil { newNodes = append(newNodes, &node) @@ -139,7 +141,7 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on defer mon.Task()(&ctx)(&err) query := ` - SELECT id, address, last_net, last_ip_port + SELECT id, address, last_net, last_ip_port, noise_proto, noise_public_key FROM nodes ` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + ` WHERE disqualified IS NULL @@ -162,13 +164,15 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on var node overlay.SelectedNode node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString - err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort) + var noise noiseScanner + err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &noise.Proto, &noise.PublicKey) if err != nil { return nil, err } if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() nodes = append(nodes, &node) } return nodes, Error.Wrap(rows.Err()) @@ -255,7 +259,7 @@ func (cache *overlaycache) getOnlineNodesForGetDelete(ctx context.Context, nodeI var rows tagsql.Rows rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT last_net, id, address, last_ip_port + SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key FROM nodes `+cache.db.impl.AsOfSystemInterval(asOf.Interval())+` WHERE id = any($1::bytea[]) @@ -274,13 +278,15 @@ func (cache *overlaycache) getOnlineNodesForGetDelete(ctx context.Context, nodeI node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString - err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort) + var noise noiseScanner + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort, &noise.Proto, &noise.PublicKey) if err != nil { return nil, err } if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() nodes[node.ID] = &node } @@ -309,7 +315,7 @@ func (cache *overlaycache) getOnlineNodesForAuditRepair(ctx context.Context, nod var rows tagsql.Rows rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT last_net, id, address, email, last_ip_port, vetted_at, + SELECT last_net, id, address, email, last_ip_port, noise_proto, noise_public_key, vetted_at, unknown_audit_suspended, offline_suspended FROM nodes WHERE id = any($1::bytea[]) @@ -328,13 +334,15 @@ func (cache *overlaycache) getOnlineNodesForAuditRepair(ctx context.Context, nod node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString - err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.Reputation.Email, &lastIPPort, &node.Reputation.VettedAt, &node.Reputation.UnknownAuditSuspended, &node.Reputation.OfflineSuspended) + var noise noiseScanner + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.Reputation.Email, &lastIPPort, &noise.Proto, &noise.PublicKey, &node.Reputation.VettedAt, &node.Reputation.UnknownAuditSuspended, &node.Reputation.OfflineSuspended) if err != nil { return nil, err } if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() nodes[node.ID] = &node } @@ -599,7 +607,7 @@ func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time. // get online nodes rows, err := cache.db.Query(ctx, cache.db.Rebind(` - SELECT id, last_net, last_ip_port, address, protocol + SELECT id, last_net, last_ip_port, address, protocol, noise_proto, noise_public_key FROM nodes WHERE id = any($1::bytea[]) AND disqualified IS NULL @@ -616,7 +624,7 @@ func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time. for rows.Next() { row := &dbx.Node{} - err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol) + err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol, &row.NoiseProto, &row.NoisePublicKey) if err != nil { return nil, err } @@ -1565,7 +1573,7 @@ func (cache *overlaycache) IterateAllContactedNodes(ctx context.Context, cb func var rows tagsql.Rows // 2018-04-06 is the date of the first storj v3 commit. rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT last_net, id, address, last_ip_port + SELECT last_net, id, address, last_ip_port, noise_proto, noise_public_key FROM nodes WHERE last_contact_success >= timestamp '2018-04-06' `)) @@ -1579,13 +1587,15 @@ func (cache *overlaycache) IterateAllContactedNodes(ctx context.Context, cb func node.Address = &pb.NodeAddress{} var lastIPPort sql.NullString - err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort) + var noise noiseScanner + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort, &noise.Proto, &noise.PublicKey) if err != nil { return Error.Wrap(err) } if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } + node.Address.NoiseInfo = noise.Convert() err = cb(ctx, &node) if err != nil { @@ -1629,3 +1639,18 @@ func (cache *overlaycache) IterateAllNodeDossiers(ctx context.Context, cb func(c func (cache *overlaycache) TestUpdateCheckInDirectUpdate(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, semVer version.SemVer, walletFeatures string) (updated bool, err error) { return cache.updateCheckInDirectUpdate(ctx, node, timestamp, semVer, walletFeatures) } + +type noiseScanner struct { + Proto sql.NullInt32 + PublicKey []byte +} + +func (n *noiseScanner) Convert() *pb.NoiseInfo { + if !n.Proto.Valid || len(n.PublicKey) == 0 { + return nil + } + return &pb.NoiseInfo{ + Proto: pb.NoiseProtocol(n.Proto.Int32), + PublicKey: n.PublicKey, + } +}