diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index 69be6b486..3c4b901d2 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -183,8 +183,8 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) { assert.Len(t, nodes, 3) for _, node := range nodes { - assert.False(t, isDisqualified(t, ctx, satellitePeer, node.Id)) - assert.NotEqual(t, node.Id, disqualifiedNode) + assert.False(t, isDisqualified(t, ctx, satellitePeer, node.ID)) + assert.NotEqual(t, node.ID, disqualifiedNode) } }) diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index fa83f1526..389c526e3 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -409,7 +409,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS } newNode := newNodes[0] - endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.Id), + endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.ID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) pieceID := remote.RootPieceId.Derive(nodeID, incomplete.PieceNum) @@ -420,7 +420,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS } bucketID := []byte(storj.JoinPaths(parts[0], parts[2])) - limit, privateKey, err := endpoint.orders.CreateGracefulExitPutOrderLimit(ctx, bucketID, newNode.Id, incomplete.PieceNum, remote.RootPieceId, int32(pieceSize)) + limit, privateKey, err := endpoint.orders.CreateGracefulExitPutOrderLimit(ctx, bucketID, newNode.ID, incomplete.PieceNum, remote.RootPieceId, int32(pieceSize)) if err != nil { return Error.Wrap(err) } diff --git a/satellite/orders/service.go b/satellite/orders/service.go index e38a49f93..c6fc16ebd 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -384,7 +384,7 @@ func (service *Service) RandomSampleOfOrderLimits(limits []*pb.AddressedOrderLim } // CreatePutOrderLimits creates the order limits for uploading pieces to nodes. -func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.NodeDossier, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { +func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.SelectedNode, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) orderExpiration := time.Now().Add(service.orderExpiration) @@ -408,8 +408,8 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byt SatelliteId: service.satellite.ID(), SatelliteAddress: service.satelliteAddress, UplinkPublicKey: piecePublicKey, - StorageNodeId: node.Id, - PieceId: rootPieceID.Derive(node.Id, pieceNum), + StorageNodeId: node.ID, + PieceId: rootPieceID.Derive(node.ID, pieceNum), Action: pb.PieceAction_PUT, Limit: maxPieceSize, PieceExpiration: expiration, @@ -835,7 +835,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID } // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes. -func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.NodeDossier) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { +func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) orderExpiration := time.Now().Add(service.orderExpiration) @@ -895,8 +895,8 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID SatelliteId: service.satellite.ID(), SatelliteAddress: service.satelliteAddress, UplinkPublicKey: piecePublicKey, - StorageNodeId: node.Id, - PieceId: rootPieceID.Derive(node.Id, pieceNum), + StorageNodeId: node.ID, + PieceId: rootPieceID.Derive(node.ID, pieceNum), Action: pb.PieceAction_PUT_REPAIR, Limit: pieceSize, PieceExpiration: pointer.ExpirationDate, diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 3d941c9c2..805ee7b2e 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -38,6 +38,7 @@ func TestMinimumDiskSpace(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { node0 := planet.StorageNodes[0] node0.Contact.Chore.Pause(ctx) + nodeDossier := node0.Local() ident := node0.Identity peer := rpcpeer.Peer{ @@ -360,7 +361,7 @@ func TestNodeSelectionGracefulExit(t *testing.T) { // expect no exiting nodes in selection for _, node := range response { - assert.False(t, exitingNodes[node.Id]) + assert.False(t, exitingNodes[node.ID]) } } }) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 7590ad8d0..cd500e9ea 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -37,9 +37,9 @@ var ErrNotEnoughNodes = errs.Class("not enough nodes") // architecture: Database type DB interface { // SelectStorageNodes looks up nodes based on criteria - SelectStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*NodeDossier, error) + SelectStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*SelectedNode, error) // SelectNewStorageNodes looks up nodes based on new node criteria - SelectNewStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*NodeDossier, error) + SelectNewStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*SelectedNode, error) // Get looks up the node by nodeID Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error) @@ -218,6 +218,14 @@ type NodeLastContact struct { LastContactFailure time.Time } +// SelectedNode is used as a result for creating orders limits. +type SelectedNode struct { + ID storj.NodeID + Address *pb.NodeAddress + LastNet string + LastIPPort string +} + // Service is used to store and handle node information // // architecture: Service @@ -268,13 +276,13 @@ func (service *Service) IsOnline(node *NodeDossier) bool { } // FindStorageNodes searches the overlay network for nodes that meet the provided requirements -func (service *Service) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*NodeDossier, err error) { +func (service *Service) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node) } // FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria -func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (nodes []*NodeDossier, err error) { +func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (nodes []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) // TODO: add sanity limits to requested node count @@ -300,7 +308,7 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req newNodeCount = int(float64(reputableNodeCount) * preferences.NewNodeFraction) } - var newNodes []*NodeDossier + var newNodes []*SelectedNode if newNodeCount > 0 { newNodes, err = service.db.SelectNewStorageNodes(ctx, newNodeCount, &NodeCriteria{ FreeDisk: preferences.MinimumDiskSpace.Int64(), @@ -318,7 +326,7 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req // add selected new nodes ID and network to the excluded lists for reputable node selection for _, newNode := range newNodes { - excludedIDs = append(excludedIDs, newNode.Id) + excludedIDs = append(excludedIDs, newNode.ID) if preferences.DistinctIP { excludedNetworks = append(excludedNetworks, newNode.LastNet) } diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 93aaef31f..28741d08d 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -186,7 +186,7 @@ func TestRandomizedSelection(t *testing.T) { // select numNodesToSelect nodes selectIterations times for i := 0; i < selectIterations; i++ { - var nodes []*overlay.NodeDossier + var nodes []*overlay.SelectedNode var err error if i%2 == 0 { @@ -205,7 +205,7 @@ func TestRandomizedSelection(t *testing.T) { require.Len(t, nodes, numNodesToSelect) for _, node := range nodes { - nodeCounts[node.Id]++ + nodeCounts[node.ID]++ } } @@ -629,7 +629,7 @@ func TestSuspendedSelection(t *testing.T) { } } - var nodes []*overlay.NodeDossier + var nodes []*overlay.SelectedNode var err error numNodesToSelect := 10 @@ -642,7 +642,7 @@ func TestSuspendedSelection(t *testing.T) { require.NoError(t, err) require.Len(t, nodes, 3) for _, node := range nodes { - require.False(t, suspendedIDs[node.Id]) + require.False(t, suspendedIDs[node.ID]) } // select 10 new nodes - 5 new, 2 suspended, so expect 3 @@ -653,7 +653,7 @@ func TestSuspendedSelection(t *testing.T) { require.NoError(t, err) require.Len(t, nodes, 3) for _, node := range nodes { - require.False(t, suspendedIDs[node.Id]) + require.False(t, suspendedIDs[node.ID]) } }) } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index b16df61fb..e45a21225 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -33,7 +33,7 @@ type overlaycache struct { db *satelliteDB } -func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.NodeDossier, err error) { +func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) nodeType := int(pb.NodeType_STORAGE) @@ -77,7 +77,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr } for _, n := range moreNodes { nodes = append(nodes, n) - criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.Id) + criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.ID) criteria.ExcludedNetworks = append(criteria.ExcludedNetworks, n.LastNet) } if len(nodes) == count { @@ -88,7 +88,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr return nodes, nil } -func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.NodeDossier, err error) { +func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) (nodes []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) nodeType := int(pb.NodeType_STORAGE) @@ -130,7 +130,7 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int, } for _, n := range moreNodes { nodes = append(nodes, n) - criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.Id) + criteria.ExcludedIDs = append(criteria.ExcludedIDs, n.ID) criteria.ExcludedNetworks = append(criteria.ExcludedNetworks, n.LastNet) } if len(nodes) == count { @@ -167,7 +167,7 @@ func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj. return nodeNets, Error.Wrap(rows.Err()) } -func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj.NodeID, count int, safeQuery string, args ...interface{}) (_ []*overlay.NodeDossier, err error) { +func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj.NodeID, count int, safeQuery string, args ...interface{}) (_ []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) if count == 0 { @@ -185,10 +185,8 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj args = append(args, count) var rows *sql.Rows - rows, err = cache.db.Query(ctx, cache.db.Rebind(`SELECT id, type, address, last_net, last_ip_port, - free_disk, total_audit_count, audit_success_count, - total_uptime_count, uptime_success_count, disqualified, suspended, - audit_reputation_alpha, audit_reputation_beta + rows, err = cache.db.Query(ctx, cache.db.Rebind(` + SELECT last_net, id, address, last_ip_port FROM nodes `+safeQuery+safeExcludeNodes+` ORDER BY RANDOM() @@ -199,29 +197,23 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj } defer func() { err = errs.Combine(err, rows.Close()) }() - var nodes []*overlay.NodeDossier + var nodes []*overlay.SelectedNode for rows.Next() { - dbNode := &dbx.Node{} - err = rows.Scan(&dbNode.Id, &dbNode.Type, &dbNode.Address, &dbNode.LastNet, &dbNode.LastIpPort, - &dbNode.FreeDisk, &dbNode.TotalAuditCount, &dbNode.AuditSuccessCount, - &dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, &dbNode.Disqualified, &dbNode.Suspended, - &dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta, - ) + var node overlay.SelectedNode + node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC} + + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort) if err != nil { return nil, err } - dossier, err := convertDBNode(ctx, dbNode) - if err != nil { - return nil, err - } - nodes = append(nodes, dossier) + nodes = append(nodes, &node) } return nodes, Error.Wrap(rows.Err()) } -func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedIDs []storj.NodeID, excludedNodeNetworks []string, count int, safeQuery string, distinctIP bool, args ...interface{}) (_ []*overlay.NodeDossier, err error) { +func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedIDs []storj.NodeID, excludedNodeNetworks []string, count int, safeQuery string, distinctIP bool, args ...interface{}) (_ []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) if count == 0 { @@ -249,9 +241,7 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedIDs [ SELECT * FROM ( SELECT DISTINCT ON (last_net) last_net, -- choose at most 1 node from this network - id, type, address, last_ip_port, free_disk, total_audit_count, - audit_success_count, total_uptime_count, uptime_success_count, - audit_reputation_alpha, audit_reputation_beta + id, address, last_ip_port FROM nodes `+safeQuery+safeExcludeNodes+safeExcludeNetworks+` AND last_net <> '' -- select nodes with a network set @@ -264,22 +254,17 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedIDs [ } defer func() { err = errs.Combine(err, rows.Close()) }() - var nodes []*overlay.NodeDossier + var nodes []*overlay.SelectedNode for rows.Next() { - dbNode := &dbx.Node{} - err = rows.Scan(&dbNode.LastNet, - &dbNode.Id, &dbNode.Type, &dbNode.Address, &dbNode.LastIpPort, &dbNode.FreeDisk, &dbNode.TotalAuditCount, - &dbNode.AuditSuccessCount, &dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, - &dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta, - ) + var node overlay.SelectedNode + node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC} + + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &node.LastIPPort) if err != nil { return nil, err } - dossier, err := convertDBNode(ctx, dbNode) - if err != nil { - return nil, err - } - nodes = append(nodes, dossier) + + nodes = append(nodes, &node) } return nodes, Error.Wrap(rows.Err())