diff --git a/bootstrap/peer.go b/bootstrap/peer.go index 919ed1eca..74a73b6fd 100644 --- a/bootstrap/peer.go +++ b/bootstrap/peer.go @@ -14,7 +14,6 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/server" "storj.io/storj/pkg/storj" @@ -64,7 +63,7 @@ type Peer struct { Kademlia struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *node.Server + Endpoint *kademlia.Endpoint Inspector *kademlia.Inspector } } @@ -129,7 +128,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) + peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/bootstrap/testing.go b/bootstrap/testing.go deleted file mode 100644 index aa3b97637..000000000 --- a/bootstrap/testing.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package bootstrap - -import ( - "storj.io/storj/pkg/node" -) - -// These methods are added to have same interface as in testplanet to make transition easier. - -// NewNodeClient creates a node client for this node -// TODO: this is temporary and only intended for tests -func (peer *Peer) NewNodeClient() (node.Client, error) { - // TODO: handle disconnect verification - return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) -} diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index 4e98cb948..c0dcf9f69 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -34,7 +34,6 @@ import ( "storj.io/storj/pkg/discovery" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls" @@ -57,8 +56,6 @@ type Peer interface { Run(context.Context) error Close() error - - NewNodeClient() (node.Client, error) } // Config describes planet configuration diff --git a/internal/testplanet/planet_test.go b/internal/testplanet/planet_test.go index 4564cec60..16c6e9d01 100644 --- a/internal/testplanet/planet_test.go +++ b/internal/testplanet/planet_test.go @@ -42,20 +42,21 @@ func TestBasic(t *testing.T) { message := client.SignedMessage() t.Log(message) - nodeClient, err := planet.StorageNodes[0].NewNodeClient() - require.NoError(t, err) - // ping a satellite - _, err = nodeClient.Ping(context.Background(), planet.Satellites[0].Local()) + _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local()) require.NoError(t, err) // ping a storage node - _, err = nodeClient.Ping(context.Background(), planet.StorageNodes[1].Local()) + _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local()) require.NoError(t, err) - err = planet.StopPeer(planet.StorageNodes[0]) + err = planet.StopPeer(planet.StorageNodes[1]) require.NoError(t, err) + // ping a stopped storage node + _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local()) + require.Error(t, err) + // wait a bit to see whether some failures occur time.Sleep(time.Second) } diff --git a/pkg/dht/dht.go b/pkg/dht/dht.go index b428d6093..b3ff99c21 100644 --- a/pkg/dht/dht.go +++ b/pkg/dht/dht.go @@ -15,7 +15,7 @@ import ( // DHT is the interface for the DHT in the Storj network type DHT interface { FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) - GetRoutingTable(ctx context.Context) (RoutingTable, error) + GetRoutingTable() RoutingTable Bootstrap(ctx context.Context) error Ping(ctx context.Context, node pb.Node) (pb.Node, error) FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error) diff --git a/pkg/kademlia/endpoint.go b/pkg/kademlia/endpoint.go new file mode 100644 index 000000000..014b8f8bd --- /dev/null +++ b/pkg/kademlia/endpoint.go @@ -0,0 +1,77 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package kademlia + +import ( + "context" + "sync/atomic" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/storj/pkg/pb" +) + +var EndpointError = errs.Class("kademlia endpoint error") + +// Endpoint implements the kademlia Endpoints +type Endpoint struct { + log *zap.Logger + service *Kademlia + routingTable *RoutingTable + connected int32 +} + +// NewEndpoint returns a new kademlia endpoint +func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable) *Endpoint { + return &Endpoint{ + service: service, + routingTable: routingTable, + log: log, + } +} + +// Query is a node to node communication query +func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { + if req.GetPingback() { + endpoint.pingback(ctx, req.Sender) + } + + nodes, err := endpoint.routingTable.FindNear(req.Target.Id, int(req.Limit)) + if err != nil { + return &pb.QueryResponse{}, EndpointError.New("could not find near endpoint: %v", err) + } + + return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil +} + +// pingback implements pingback for queries +func (endpoint *Endpoint) pingback(ctx context.Context, target *pb.Node) { + _, err := endpoint.service.Ping(ctx, *target) + if err != nil { + endpoint.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", target.Id.String())) + err = endpoint.routingTable.ConnectionFailed(target) + if err != nil { + endpoint.log.Error("could not respond to connection failed", zap.Error(err)) + } + } else { + err = endpoint.routingTable.ConnectionSuccess(target) + if err != nil { + endpoint.log.Error("could not respond to connection success", zap.Error(err)) + } else { + count := atomic.AddInt32(&endpoint.connected, 1) + if count == 1 { + endpoint.log.Sugar().Debugf("Successfully connected with %s", target.Address.Address) + } else if count%100 == 0 { + endpoint.log.Sugar().Debugf("Successfully connected with %s %dx times", target.Address.Address, count) + } + } + } +} + +// Ping provides an easy way to verify a node is online and accepting requests +func (endpoint *Endpoint) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { + //TODO + return &pb.PingResponse{}, nil +} diff --git a/pkg/kademlia/inspector.go b/pkg/kademlia/inspector.go index 4768ecfe4..f5170dded 100644 --- a/pkg/kademlia/inspector.go +++ b/pkg/kademlia/inspector.go @@ -6,21 +6,19 @@ package kademlia import ( "context" - "storj.io/storj/pkg/dht" "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" ) // Inspector is a gRPC service for inspecting kademlia internals type Inspector struct { - dht dht.DHT + dht *Kademlia identity *identity.FullIdentity } // NewInspector creates an Inspector -func NewInspector(kad dht.DHT, identity *identity.FullIdentity) *Inspector { +func NewInspector(kad *Kademlia, identity *identity.FullIdentity) *Inspector { return &Inspector{ dht: kad, identity: identity, @@ -42,10 +40,7 @@ func (srv *Inspector) CountNodes(ctx context.Context, req *pb.CountNodesRequest) // GetBuckets returns all kademlia buckets for current kademlia instance func (srv *Inspector) GetBuckets(ctx context.Context, req *pb.GetBucketsRequest) (*pb.GetBucketsResponse, error) { - rt, err := srv.dht.GetRoutingTable(ctx) - if err != nil { - return &pb.GetBucketsResponse{}, Error.Wrap(err) - } + rt := srv.dht.GetRoutingTable() b, err := rt.GetBucketIds() if err != nil { return nil, err @@ -77,31 +72,22 @@ func (srv *Inspector) FindNear(ctx context.Context, req *pb.FindNearRequest) (*p // PingNode sends a PING RPC to the provided node ID in the Kad network. func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) { - rt, err := srv.dht.GetRoutingTable(ctx) - if err != nil { - return &pb.PingNodeResponse{}, Error.Wrap(err) - } - + rt := srv.dht.GetRoutingTable() self := rt.Local() - nc, err := node.NewNodeClient(srv.identity, self, srv.dht) - if err != nil { - return &pb.PingNodeResponse{}, Error.Wrap(err) - } - - p, err := nc.Ping(ctx, pb.Node{ + _, err := srv.dht.Ping(ctx, pb.Node{ Id: req.Id, Type: self.Type, Address: &pb.NodeAddress{ Address: req.Address, }, }) - res := &pb.PingNodeResponse{Ok: p} + + res := &pb.PingNodeResponse{Ok: err == nil} if err != nil { return res, Error.Wrap(err) } - return res, nil } diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index 4407df05e..478269f76 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -119,10 +119,8 @@ func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int, return nodes, nil } -// GetRoutingTable provides the routing table for the Kademlia DHT -func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) { - return k.routingTable, nil -} +// GetRoutingTable provides the assigned routing table +func (k *Kademlia) GetRoutingTable() dht.RoutingTable { return k.routingTable } // SetBootstrapNodes sets the bootstrap nodes. // Must be called before anything starting to use kademlia. diff --git a/pkg/kademlia/kademlia_test.go b/pkg/kademlia/kademlia_test.go index a98ab8024..2b5485ab3 100644 --- a/pkg/kademlia/kademlia_test.go +++ b/pkg/kademlia/kademlia_test.go @@ -27,7 +27,6 @@ import ( "storj.io/storj/internal/testidentity" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/storage/teststore" @@ -104,8 +103,7 @@ func TestPeerDiscovery(t *testing.T) { } k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha) assert.NoError(t, err) - rt, err := k.GetRoutingTable(ctx) - assert.NoError(t, err) + rt := k.GetRoutingTable() assert.Equal(t, rt.Local().Metadata.Email, "foo@bar.com") assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet") @@ -176,7 +174,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) { logger := zaptest.NewLogger(t) k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha) assert.NoError(t, err) - s := node.NewServer(logger, k) + s := NewEndpoint(logger, k, k.GetRoutingTable().(*RoutingTable)) // new ident opts identOpt, err := fid.ServerOption() assert.NoError(t, err) diff --git a/pkg/node/client.go b/pkg/node/client.go deleted file mode 100644 index 38eec5f0b..000000000 --- a/pkg/node/client.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information - -package node - -import ( - "context" - - "github.com/zeebo/errs" - - "storj.io/storj/pkg/dht" - "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/transport" -) - -//NodeClientErr is the class for all errors pertaining to node client operations -var NodeClientErr = errs.Class("node client error") - -// NewNodeClient instantiates a node client -func NewNodeClient(identity *identity.FullIdentity, self pb.Node, dht dht.DHT, obs ...transport.Observer) (Client, error) { - node := &Node{ - dht: dht, - self: self, - pool: NewConnectionPool(identity, obs...), - } - - node.pool.Init() - - return node, nil -} - -// Client is the Node client communication interface -type Client interface { - Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) - Ping(ctx context.Context, to pb.Node) (bool, error) - Disconnect() error -} diff --git a/pkg/node/connection_pool.go b/pkg/node/connection_pool.go deleted file mode 100644 index ce5333079..000000000 --- a/pkg/node/connection_pool.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information - -package node - -import ( - "context" - "sync" - "sync/atomic" - "unsafe" - - "github.com/zeebo/errs" - "google.golang.org/grpc" - - "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/storj" - "storj.io/storj/pkg/transport" - "storj.io/storj/pkg/utils" -) - -// Error defines a connection pool error -var Error = errs.Class("connection pool error") - -// ConnectionPool is the in memory pool of node connections -type ConnectionPool struct { - tc transport.Client - mu sync.RWMutex - items map[storj.NodeID]*Conn -} - -// Conn is the connection that is stored in the connection pool -type Conn struct { - addr string - - dial sync.Once - client pb.NodesClient - grpc unsafe.Pointer //*grpc.ClientConn - err error -} - -// NewConn intitalizes a new Conn struct with the provided address, but does not iniate a connection -func NewConn(addr string) *Conn { return &Conn{addr: addr} } - -// NewConnectionPool initializes a new in memory pool -func NewConnectionPool(identity *identity.FullIdentity, obs ...transport.Observer) *ConnectionPool { - return &ConnectionPool{ - tc: transport.NewClient(identity, obs...), - items: make(map[storj.NodeID]*Conn), - mu: sync.RWMutex{}, - } -} - -// Get retrieves a node connection with the provided nodeID -// nil is returned if the NodeID is not in the connection pool -func (pool *ConnectionPool) Get(id storj.NodeID) (interface{}, error) { - pool.mu.Lock() - defer pool.mu.Unlock() - - i, ok := pool.items[id] - if !ok { - return nil, nil - } - - return i, nil -} - -// Disconnect deletes a connection associated with the provided NodeID -func (pool *ConnectionPool) Disconnect(id storj.NodeID) error { - pool.mu.Lock() - defer pool.mu.Unlock() - - return pool.disconnect(id) - -} - -func (pool *ConnectionPool) disconnect(id storj.NodeID) error { - conn, ok := pool.items[id] - if !ok { - return nil - } - - ptr := atomic.LoadPointer(&conn.grpc) - if ptr == nil { - return nil - } - - delete(pool.items, id) - - return (*grpc.ClientConn)(ptr).Close() -} - -// Dial connects to the node with the given ID and Address returning a gRPC Node Client -func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClient, error) { - id := n.Id - pool.mu.Lock() - conn, ok := pool.items[id] - if !ok { - conn = NewConn(n.GetAddress().Address) - pool.items[id] = conn - } - pool.mu.Unlock() - - if n != nil { - n.Type.DPanicOnInvalid("connection pool dial") - } - - conn.dial.Do(func() { - grpc, err := pool.tc.DialNode(ctx, n) - conn.err = err - if conn.err != nil { - return - } - - atomic.StorePointer(&conn.grpc, unsafe.Pointer(grpc)) - - conn.client = pb.NewNodesClient(grpc) - }) - - if conn.err != nil { - return nil, conn.err - } - - return conn.client, nil -} - -// DisconnectAll closes all connections nodes and removes them from the connection pool -func (pool *ConnectionPool) DisconnectAll() error { - pool.mu.Lock() - defer pool.mu.Unlock() - - errs := []error{} - for k := range pool.items { - if err := pool.disconnect(k); err != nil { - errs = append(errs, Error.Wrap(err)) - } - } - - return utils.CombineErrors(errs...) -} - -// Init initializes the cache -func (pool *ConnectionPool) Init() { - pool.mu.Lock() - pool.items = make(map[storj.NodeID]*Conn) - pool.mu.Unlock() -} diff --git a/pkg/node/node.go b/pkg/node/node.go deleted file mode 100644 index 835f8a3b0..000000000 --- a/pkg/node/node.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information - -package node - -import ( - "context" - - "storj.io/storj/pkg/dht" - "storj.io/storj/pkg/pb" -) - -// Node is the storj definition for a node in the network -type Node struct { - dht dht.DHT - self pb.Node - pool *ConnectionPool -} - -// Lookup queries nodes looking for a particular node in the network -func (node *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) { - to.Type.DPanicOnInvalid("node Lookup") - conn, err := node.pool.Dial(ctx, &to) - - if err != nil { - return nil, NodeClientErr.Wrap(err) - } - resp, err := conn.Query(ctx, &pb.QueryRequest{ - Limit: 20, - Sender: &node.self, - Target: &find, - Pingback: true, - }) - if err != nil { - return nil, NodeClientErr.Wrap(err) - } - rt, err := node.dht.GetRoutingTable(ctx) - if err != nil { - return nil, NodeClientErr.Wrap(err) - } - if err := rt.ConnectionSuccess(&to); err != nil { - return nil, NodeClientErr.Wrap(err) - } - return resp.Response, nil -} - -// Ping attempts to establish a connection with a node to verify it is alive -func (node *Node) Ping(ctx context.Context, to pb.Node) (bool, error) { - to.Type.DPanicOnInvalid("node ping") - conn, err := node.pool.Dial(ctx, &to) - if err != nil { - return false, NodeClientErr.Wrap(err) - } - _, err = conn.Ping(ctx, &pb.PingRequest{}) - if err != nil { - return false, err - } - return true, nil -} - -// Disconnect closes all connections within the pool -func (node *Node) Disconnect() error { - return node.pool.DisconnectAll() -} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go deleted file mode 100644 index 36cf98562..000000000 --- a/pkg/node/node_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package node_test - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "golang.org/x/sync/errgroup" - - "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testplanet" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/storj" - "storj.io/storj/pkg/utils" -) - -func TestClient(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - planet, err := testplanet.New(t, 1, 4, 0) - if err != nil { - t.Fatal(err) - } - defer ctx.Check(planet.Shutdown) - - planet.Start(ctx) - - time.Sleep(2 * time.Second) - - // TODO: also use satellites - peers := planet.StorageNodes - - { // Ping - client, err := planet.StorageNodes[0].NewNodeClient() - assert.NoError(t, err) - defer ctx.Check(client.Disconnect) - - var group errgroup.Group - - for i := range peers { - peer := peers[i] - group.Go(func() error { - pinged, err := client.Ping(ctx, peer.Local()) - var pingErr error - if !pinged { - pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID()) - } - return utils.CombineErrors(pingErr, err) - }) - } - - defer ctx.Check(group.Wait) - } - - { // Lookup - client, err := planet.StorageNodes[1].NewNodeClient() - assert.NoError(t, err) - defer ctx.Check(client.Disconnect) - - var group errgroup.Group - - for i := range peers { - peer := peers[i] - group.Go(func() error { - for _, target := range peers { - errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID()) - peer.Local().Type.DPanicOnInvalid("test client peer") - target.Local().Type.DPanicOnInvalid("test client target") - results, err := client.Lookup(ctx, peer.Local(), target.Local()) - if err != nil { - return utils.CombineErrors(errTag, err) - } - - if containsResult(results, target.ID()) { - continue - } - - // with small network we expect to return everything - if len(results) != planet.Size() { - return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results))) - } - - return nil - } - return nil - }) - } - - defer ctx.Check(group.Wait) - } - - { // Lookup - client, err := planet.StorageNodes[2].NewNodeClient() - assert.NoError(t, err) - defer ctx.Check(client.Disconnect) - - targets := []storj.NodeID{ - {}, // empty target - {255}, // non-empty - } - - var group errgroup.Group - - for i := range targets { - target := targets[i] - for i := range peers { - peer := peers[i] - group.Go(func() error { - errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target) - peer.Local().Type.DPanicOnInvalid("peer info") - results, err := client.Lookup(ctx, peer.Local(), pb.Node{Id: target, Type: pb.NodeType_STORAGE}) - if err != nil { - return utils.CombineErrors(errTag, err) - } - - // with small network we expect to return everything - if len(results) != planet.Size() { - return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results))) - } - - return nil - }) - } - } - - defer ctx.Check(group.Wait) - } -} - -func containsResult(nodes []*pb.Node, target storj.NodeID) bool { - for _, node := range nodes { - if node.Id == target { - return true - } - } - return false -} diff --git a/pkg/node/server.go b/pkg/node/server.go deleted file mode 100644 index 219f87d74..000000000 --- a/pkg/node/server.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package node - -import ( - "context" - "sync/atomic" - - "go.uber.org/zap" - - "storj.io/storj/pkg/dht" - "storj.io/storj/pkg/pb" -) - -// Server implements the grpc Node Server -type Server struct { - dht dht.DHT - log *zap.Logger - - connected int32 -} - -// NewServer returns a newly instantiated Node Server -func NewServer(log *zap.Logger, dht dht.DHT) *Server { - return &Server{ - dht: dht, - log: log, - } -} - -// Query is a node to node communication query -func (server *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { - rt, err := server.dht.GetRoutingTable(ctx) - if err != nil { - return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %server", err) - } - - if req.GetPingback() { - _, err = server.dht.Ping(ctx, *req.Sender) - if err != nil { - server.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String())) - err = rt.ConnectionFailed(req.Sender) - if err != nil { - server.log.Error("could not respond to connection failed", zap.Error(err)) - } - } else { - err = rt.ConnectionSuccess(req.Sender) - if err != nil { - server.log.Error("could not respond to connection success", zap.Error(err)) - } else { - count := atomic.AddInt32(&server.connected, 1) - if count == 1 { - server.log.Sugar().Debugf("Successfully connected with %s", req.Sender.Address.Address) - } else if count%100 == 0 { - server.log.Sugar().Debugf("Successfully connected with %s %dx times", req.Sender.Address.Address, count) - } - } - } - } - - nodes, err := rt.FindNear(req.Target.Id, int(req.Limit)) - if err != nil { - return &pb.QueryResponse{}, NodeClientErr.New("could not find near %server", err) - } - - return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil -} - -// Ping provides an easy way to verify a node is online and accepting requests -func (server *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { - //TODO - return &pb.PingResponse{}, nil -} diff --git a/pkg/piecestore/psserver/server.go b/pkg/piecestore/psserver/server.go index ed926b7d4..0ce27b3db 100644 --- a/pkg/piecestore/psserver/server.go +++ b/pkg/piecestore/psserver/server.go @@ -370,10 +370,7 @@ func (s *Server) getDashboardData(ctx context.Context) (*pb.DashboardStats, erro return &pb.DashboardStats{}, ServerError.Wrap(err) } - rt, err := s.kad.GetRoutingTable(ctx) - if err != nil { - return &pb.DashboardStats{}, ServerError.Wrap(err) - } + rt := s.kad.GetRoutingTable() nodes, err := s.kad.FindNear(ctx, storj.NodeID{}, 10000000) if err != nil { diff --git a/satellite/peer.go b/satellite/peer.go index 8aa4128f2..2537413e8 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -29,7 +29,6 @@ import ( "storj.io/storj/pkg/discovery" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/pointerdb" @@ -116,7 +115,7 @@ type Peer struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *node.Server + Endpoint *kademlia.Endpoint Inspector *kademlia.Inspector } @@ -239,7 +238,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (* return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) + peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/satellite/testing.go b/satellite/testing.go deleted file mode 100644 index 48ae4fd94..000000000 --- a/satellite/testing.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package satellite - -import ( - "storj.io/storj/pkg/node" -) - -// These methods are added to have same interface as in testplanet to make transition easier. - -// NewNodeClient creates a node client for this node -// TODO: this is temporary and only intended for tests -func (peer *Peer) NewNodeClient() (node.Client, error) { - // TODO: handle disconnect verification - return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) -} diff --git a/storagenode/peer.go b/storagenode/peer.go index 278394d02..115daa558 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -14,7 +14,6 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/piecestore/psserver" @@ -72,7 +71,7 @@ type Peer struct { Kademlia struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *node.Server + Endpoint *kademlia.Endpoint Inspector *kademlia.Inspector } @@ -147,7 +146,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) + peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/storagenode/testing.go b/storagenode/testing.go deleted file mode 100644 index 3c825fb57..000000000 --- a/storagenode/testing.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package storagenode - -import ( - "storj.io/storj/pkg/node" -) - -// These methods are added to have same interface as in testplanet to make transition easier. - -// NewNodeClient creates a node client for this node -// TODO: this is temporary and only intended for tests -func (peer *Peer) NewNodeClient() (node.Client, error) { - // TODO: handle disconnect verification - return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) -}