From 87d6410b50fc75e142d262762a795a15e2730b25 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 5 Feb 2019 10:38:48 +0200 Subject: [PATCH] Revert "Remove node package and simplify DHT interface." This reverts commit 03ec1ff92d89f7e39454883484eaa6796b015f3b. --- bootstrap/peer.go | 5 +- bootstrap/testing.go | 17 ++++ internal/testplanet/planet.go | 3 + internal/testplanet/planet_test.go | 13 ++- pkg/dht/dht.go | 2 +- pkg/kademlia/endpoint.go | 77 --------------- pkg/kademlia/inspector.go | 28 ++++-- pkg/kademlia/kademlia.go | 6 +- pkg/kademlia/kademlia_test.go | 6 +- pkg/node/client.go | 38 ++++++++ pkg/node/connection_pool.go | 147 +++++++++++++++++++++++++++++ pkg/node/node.go | 64 +++++++++++++ pkg/node/node_test.go | 142 ++++++++++++++++++++++++++++ pkg/node/server.go | 74 +++++++++++++++ pkg/piecestore/psserver/server.go | 5 +- satellite/peer.go | 5 +- satellite/testing.go | 17 ++++ storagenode/peer.go | 5 +- storagenode/testing.go | 17 ++++ 19 files changed, 568 insertions(+), 103 deletions(-) create mode 100644 bootstrap/testing.go delete mode 100644 pkg/kademlia/endpoint.go create mode 100644 pkg/node/client.go create mode 100644 pkg/node/connection_pool.go create mode 100644 pkg/node/node.go create mode 100644 pkg/node/node_test.go create mode 100644 pkg/node/server.go create mode 100644 satellite/testing.go create mode 100644 storagenode/testing.go diff --git a/bootstrap/peer.go b/bootstrap/peer.go index 74a73b6fd..919ed1eca 100644 --- a/bootstrap/peer.go +++ b/bootstrap/peer.go @@ -14,6 +14,7 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/server" "storj.io/storj/pkg/storj" @@ -63,7 +64,7 @@ type Peer struct { Kademlia struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *kademlia.Endpoint + Endpoint *node.Server Inspector *kademlia.Inspector } } @@ -128,7 +129,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) + peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/bootstrap/testing.go b/bootstrap/testing.go new file mode 100644 index 000000000..aa3b97637 --- /dev/null +++ b/bootstrap/testing.go @@ -0,0 +1,17 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package bootstrap + +import ( + "storj.io/storj/pkg/node" +) + +// These methods are added to have same interface as in testplanet to make transition easier. + +// NewNodeClient creates a node client for this node +// TODO: this is temporary and only intended for tests +func (peer *Peer) NewNodeClient() (node.Client, error) { + // TODO: handle disconnect verification + return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) +} diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index c0dcf9f69..4e98cb948 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -34,6 +34,7 @@ import ( "storj.io/storj/pkg/discovery" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls" @@ -56,6 +57,8 @@ type Peer interface { Run(context.Context) error Close() error + + NewNodeClient() (node.Client, error) } // Config describes planet configuration diff --git a/internal/testplanet/planet_test.go b/internal/testplanet/planet_test.go index 16c6e9d01..4564cec60 100644 --- a/internal/testplanet/planet_test.go +++ b/internal/testplanet/planet_test.go @@ -42,21 +42,20 @@ func TestBasic(t *testing.T) { message := client.SignedMessage() t.Log(message) + nodeClient, err := planet.StorageNodes[0].NewNodeClient() + require.NoError(t, err) + // ping a satellite - _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local()) + _, err = nodeClient.Ping(context.Background(), planet.Satellites[0].Local()) require.NoError(t, err) // ping a storage node - _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local()) + _, err = nodeClient.Ping(context.Background(), planet.StorageNodes[1].Local()) require.NoError(t, err) - err = planet.StopPeer(planet.StorageNodes[1]) + err = planet.StopPeer(planet.StorageNodes[0]) require.NoError(t, err) - // ping a stopped storage node - _, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local()) - require.Error(t, err) - // wait a bit to see whether some failures occur time.Sleep(time.Second) } diff --git a/pkg/dht/dht.go b/pkg/dht/dht.go index b3ff99c21..b428d6093 100644 --- a/pkg/dht/dht.go +++ b/pkg/dht/dht.go @@ -15,7 +15,7 @@ import ( // DHT is the interface for the DHT in the Storj network type DHT interface { FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) - GetRoutingTable() RoutingTable + GetRoutingTable(ctx context.Context) (RoutingTable, error) Bootstrap(ctx context.Context) error Ping(ctx context.Context, node pb.Node) (pb.Node, error) FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error) diff --git a/pkg/kademlia/endpoint.go b/pkg/kademlia/endpoint.go deleted file mode 100644 index 014b8f8bd..000000000 --- a/pkg/kademlia/endpoint.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package kademlia - -import ( - "context" - "sync/atomic" - - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/storj/pkg/pb" -) - -var EndpointError = errs.Class("kademlia endpoint error") - -// Endpoint implements the kademlia Endpoints -type Endpoint struct { - log *zap.Logger - service *Kademlia - routingTable *RoutingTable - connected int32 -} - -// NewEndpoint returns a new kademlia endpoint -func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable) *Endpoint { - return &Endpoint{ - service: service, - routingTable: routingTable, - log: log, - } -} - -// Query is a node to node communication query -func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { - if req.GetPingback() { - endpoint.pingback(ctx, req.Sender) - } - - nodes, err := endpoint.routingTable.FindNear(req.Target.Id, int(req.Limit)) - if err != nil { - return &pb.QueryResponse{}, EndpointError.New("could not find near endpoint: %v", err) - } - - return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil -} - -// pingback implements pingback for queries -func (endpoint *Endpoint) pingback(ctx context.Context, target *pb.Node) { - _, err := endpoint.service.Ping(ctx, *target) - if err != nil { - endpoint.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", target.Id.String())) - err = endpoint.routingTable.ConnectionFailed(target) - if err != nil { - endpoint.log.Error("could not respond to connection failed", zap.Error(err)) - } - } else { - err = endpoint.routingTable.ConnectionSuccess(target) - if err != nil { - endpoint.log.Error("could not respond to connection success", zap.Error(err)) - } else { - count := atomic.AddInt32(&endpoint.connected, 1) - if count == 1 { - endpoint.log.Sugar().Debugf("Successfully connected with %s", target.Address.Address) - } else if count%100 == 0 { - endpoint.log.Sugar().Debugf("Successfully connected with %s %dx times", target.Address.Address, count) - } - } - } -} - -// Ping provides an easy way to verify a node is online and accepting requests -func (endpoint *Endpoint) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { - //TODO - return &pb.PingResponse{}, nil -} diff --git a/pkg/kademlia/inspector.go b/pkg/kademlia/inspector.go index f5170dded..4768ecfe4 100644 --- a/pkg/kademlia/inspector.go +++ b/pkg/kademlia/inspector.go @@ -6,19 +6,21 @@ package kademlia import ( "context" + "storj.io/storj/pkg/dht" "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" ) // Inspector is a gRPC service for inspecting kademlia internals type Inspector struct { - dht *Kademlia + dht dht.DHT identity *identity.FullIdentity } // NewInspector creates an Inspector -func NewInspector(kad *Kademlia, identity *identity.FullIdentity) *Inspector { +func NewInspector(kad dht.DHT, identity *identity.FullIdentity) *Inspector { return &Inspector{ dht: kad, identity: identity, @@ -40,7 +42,10 @@ func (srv *Inspector) CountNodes(ctx context.Context, req *pb.CountNodesRequest) // GetBuckets returns all kademlia buckets for current kademlia instance func (srv *Inspector) GetBuckets(ctx context.Context, req *pb.GetBucketsRequest) (*pb.GetBucketsResponse, error) { - rt := srv.dht.GetRoutingTable() + rt, err := srv.dht.GetRoutingTable(ctx) + if err != nil { + return &pb.GetBucketsResponse{}, Error.Wrap(err) + } b, err := rt.GetBucketIds() if err != nil { return nil, err @@ -72,22 +77,31 @@ func (srv *Inspector) FindNear(ctx context.Context, req *pb.FindNearRequest) (*p // PingNode sends a PING RPC to the provided node ID in the Kad network. func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) { - rt := srv.dht.GetRoutingTable() + rt, err := srv.dht.GetRoutingTable(ctx) + if err != nil { + return &pb.PingNodeResponse{}, Error.Wrap(err) + } + self := rt.Local() - _, err := srv.dht.Ping(ctx, pb.Node{ + nc, err := node.NewNodeClient(srv.identity, self, srv.dht) + if err != nil { + return &pb.PingNodeResponse{}, Error.Wrap(err) + } + + p, err := nc.Ping(ctx, pb.Node{ Id: req.Id, Type: self.Type, Address: &pb.NodeAddress{ Address: req.Address, }, }) - - res := &pb.PingNodeResponse{Ok: err == nil} + res := &pb.PingNodeResponse{Ok: p} if err != nil { return res, Error.Wrap(err) } + return res, nil } diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index 478269f76..4407df05e 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -119,8 +119,10 @@ func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int, return nodes, nil } -// GetRoutingTable provides the assigned routing table -func (k *Kademlia) GetRoutingTable() dht.RoutingTable { return k.routingTable } +// GetRoutingTable provides the routing table for the Kademlia DHT +func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) { + return k.routingTable, nil +} // SetBootstrapNodes sets the bootstrap nodes. // Must be called before anything starting to use kademlia. diff --git a/pkg/kademlia/kademlia_test.go b/pkg/kademlia/kademlia_test.go index 2b5485ab3..a98ab8024 100644 --- a/pkg/kademlia/kademlia_test.go +++ b/pkg/kademlia/kademlia_test.go @@ -27,6 +27,7 @@ import ( "storj.io/storj/internal/testidentity" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/storage/teststore" @@ -103,7 +104,8 @@ func TestPeerDiscovery(t *testing.T) { } k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha) assert.NoError(t, err) - rt := k.GetRoutingTable() + rt, err := k.GetRoutingTable(ctx) + assert.NoError(t, err) assert.Equal(t, rt.Local().Metadata.Email, "foo@bar.com") assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet") @@ -174,7 +176,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) { logger := zaptest.NewLogger(t) k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha) assert.NoError(t, err) - s := NewEndpoint(logger, k, k.GetRoutingTable().(*RoutingTable)) + s := node.NewServer(logger, k) // new ident opts identOpt, err := fid.ServerOption() assert.NoError(t, err) diff --git a/pkg/node/client.go b/pkg/node/client.go new file mode 100644 index 000000000..38eec5f0b --- /dev/null +++ b/pkg/node/client.go @@ -0,0 +1,38 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +package node + +import ( + "context" + + "github.com/zeebo/errs" + + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/transport" +) + +//NodeClientErr is the class for all errors pertaining to node client operations +var NodeClientErr = errs.Class("node client error") + +// NewNodeClient instantiates a node client +func NewNodeClient(identity *identity.FullIdentity, self pb.Node, dht dht.DHT, obs ...transport.Observer) (Client, error) { + node := &Node{ + dht: dht, + self: self, + pool: NewConnectionPool(identity, obs...), + } + + node.pool.Init() + + return node, nil +} + +// Client is the Node client communication interface +type Client interface { + Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) + Ping(ctx context.Context, to pb.Node) (bool, error) + Disconnect() error +} diff --git a/pkg/node/connection_pool.go b/pkg/node/connection_pool.go new file mode 100644 index 000000000..ce5333079 --- /dev/null +++ b/pkg/node/connection_pool.go @@ -0,0 +1,147 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +package node + +import ( + "context" + "sync" + "sync/atomic" + "unsafe" + + "github.com/zeebo/errs" + "google.golang.org/grpc" + + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/transport" + "storj.io/storj/pkg/utils" +) + +// Error defines a connection pool error +var Error = errs.Class("connection pool error") + +// ConnectionPool is the in memory pool of node connections +type ConnectionPool struct { + tc transport.Client + mu sync.RWMutex + items map[storj.NodeID]*Conn +} + +// Conn is the connection that is stored in the connection pool +type Conn struct { + addr string + + dial sync.Once + client pb.NodesClient + grpc unsafe.Pointer //*grpc.ClientConn + err error +} + +// NewConn intitalizes a new Conn struct with the provided address, but does not iniate a connection +func NewConn(addr string) *Conn { return &Conn{addr: addr} } + +// NewConnectionPool initializes a new in memory pool +func NewConnectionPool(identity *identity.FullIdentity, obs ...transport.Observer) *ConnectionPool { + return &ConnectionPool{ + tc: transport.NewClient(identity, obs...), + items: make(map[storj.NodeID]*Conn), + mu: sync.RWMutex{}, + } +} + +// Get retrieves a node connection with the provided nodeID +// nil is returned if the NodeID is not in the connection pool +func (pool *ConnectionPool) Get(id storj.NodeID) (interface{}, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + i, ok := pool.items[id] + if !ok { + return nil, nil + } + + return i, nil +} + +// Disconnect deletes a connection associated with the provided NodeID +func (pool *ConnectionPool) Disconnect(id storj.NodeID) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + return pool.disconnect(id) + +} + +func (pool *ConnectionPool) disconnect(id storj.NodeID) error { + conn, ok := pool.items[id] + if !ok { + return nil + } + + ptr := atomic.LoadPointer(&conn.grpc) + if ptr == nil { + return nil + } + + delete(pool.items, id) + + return (*grpc.ClientConn)(ptr).Close() +} + +// Dial connects to the node with the given ID and Address returning a gRPC Node Client +func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClient, error) { + id := n.Id + pool.mu.Lock() + conn, ok := pool.items[id] + if !ok { + conn = NewConn(n.GetAddress().Address) + pool.items[id] = conn + } + pool.mu.Unlock() + + if n != nil { + n.Type.DPanicOnInvalid("connection pool dial") + } + + conn.dial.Do(func() { + grpc, err := pool.tc.DialNode(ctx, n) + conn.err = err + if conn.err != nil { + return + } + + atomic.StorePointer(&conn.grpc, unsafe.Pointer(grpc)) + + conn.client = pb.NewNodesClient(grpc) + }) + + if conn.err != nil { + return nil, conn.err + } + + return conn.client, nil +} + +// DisconnectAll closes all connections nodes and removes them from the connection pool +func (pool *ConnectionPool) DisconnectAll() error { + pool.mu.Lock() + defer pool.mu.Unlock() + + errs := []error{} + for k := range pool.items { + if err := pool.disconnect(k); err != nil { + errs = append(errs, Error.Wrap(err)) + } + } + + return utils.CombineErrors(errs...) +} + +// Init initializes the cache +func (pool *ConnectionPool) Init() { + pool.mu.Lock() + pool.items = make(map[storj.NodeID]*Conn) + pool.mu.Unlock() +} diff --git a/pkg/node/node.go b/pkg/node/node.go new file mode 100644 index 000000000..835f8a3b0 --- /dev/null +++ b/pkg/node/node.go @@ -0,0 +1,64 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +package node + +import ( + "context" + + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/pb" +) + +// Node is the storj definition for a node in the network +type Node struct { + dht dht.DHT + self pb.Node + pool *ConnectionPool +} + +// Lookup queries nodes looking for a particular node in the network +func (node *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) { + to.Type.DPanicOnInvalid("node Lookup") + conn, err := node.pool.Dial(ctx, &to) + + if err != nil { + return nil, NodeClientErr.Wrap(err) + } + resp, err := conn.Query(ctx, &pb.QueryRequest{ + Limit: 20, + Sender: &node.self, + Target: &find, + Pingback: true, + }) + if err != nil { + return nil, NodeClientErr.Wrap(err) + } + rt, err := node.dht.GetRoutingTable(ctx) + if err != nil { + return nil, NodeClientErr.Wrap(err) + } + if err := rt.ConnectionSuccess(&to); err != nil { + return nil, NodeClientErr.Wrap(err) + } + return resp.Response, nil +} + +// Ping attempts to establish a connection with a node to verify it is alive +func (node *Node) Ping(ctx context.Context, to pb.Node) (bool, error) { + to.Type.DPanicOnInvalid("node ping") + conn, err := node.pool.Dial(ctx, &to) + if err != nil { + return false, NodeClientErr.Wrap(err) + } + _, err = conn.Ping(ctx, &pb.PingRequest{}) + if err != nil { + return false, err + } + return true, nil +} + +// Disconnect closes all connections within the pool +func (node *Node) Disconnect() error { + return node.pool.DisconnectAll() +} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go new file mode 100644 index 000000000..36cf98562 --- /dev/null +++ b/pkg/node/node_test.go @@ -0,0 +1,142 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package node_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/utils" +) + +func TestClient(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 0) + if err != nil { + t.Fatal(err) + } + defer ctx.Check(planet.Shutdown) + + planet.Start(ctx) + + time.Sleep(2 * time.Second) + + // TODO: also use satellites + peers := planet.StorageNodes + + { // Ping + client, err := planet.StorageNodes[0].NewNodeClient() + assert.NoError(t, err) + defer ctx.Check(client.Disconnect) + + var group errgroup.Group + + for i := range peers { + peer := peers[i] + group.Go(func() error { + pinged, err := client.Ping(ctx, peer.Local()) + var pingErr error + if !pinged { + pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID()) + } + return utils.CombineErrors(pingErr, err) + }) + } + + defer ctx.Check(group.Wait) + } + + { // Lookup + client, err := planet.StorageNodes[1].NewNodeClient() + assert.NoError(t, err) + defer ctx.Check(client.Disconnect) + + var group errgroup.Group + + for i := range peers { + peer := peers[i] + group.Go(func() error { + for _, target := range peers { + errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID()) + peer.Local().Type.DPanicOnInvalid("test client peer") + target.Local().Type.DPanicOnInvalid("test client target") + results, err := client.Lookup(ctx, peer.Local(), target.Local()) + if err != nil { + return utils.CombineErrors(errTag, err) + } + + if containsResult(results, target.ID()) { + continue + } + + // with small network we expect to return everything + if len(results) != planet.Size() { + return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results))) + } + + return nil + } + return nil + }) + } + + defer ctx.Check(group.Wait) + } + + { // Lookup + client, err := planet.StorageNodes[2].NewNodeClient() + assert.NoError(t, err) + defer ctx.Check(client.Disconnect) + + targets := []storj.NodeID{ + {}, // empty target + {255}, // non-empty + } + + var group errgroup.Group + + for i := range targets { + target := targets[i] + for i := range peers { + peer := peers[i] + group.Go(func() error { + errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target) + peer.Local().Type.DPanicOnInvalid("peer info") + results, err := client.Lookup(ctx, peer.Local(), pb.Node{Id: target, Type: pb.NodeType_STORAGE}) + if err != nil { + return utils.CombineErrors(errTag, err) + } + + // with small network we expect to return everything + if len(results) != planet.Size() { + return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results))) + } + + return nil + }) + } + } + + defer ctx.Check(group.Wait) + } +} + +func containsResult(nodes []*pb.Node, target storj.NodeID) bool { + for _, node := range nodes { + if node.Id == target { + return true + } + } + return false +} diff --git a/pkg/node/server.go b/pkg/node/server.go new file mode 100644 index 000000000..219f87d74 --- /dev/null +++ b/pkg/node/server.go @@ -0,0 +1,74 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package node + +import ( + "context" + "sync/atomic" + + "go.uber.org/zap" + + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/pb" +) + +// Server implements the grpc Node Server +type Server struct { + dht dht.DHT + log *zap.Logger + + connected int32 +} + +// NewServer returns a newly instantiated Node Server +func NewServer(log *zap.Logger, dht dht.DHT) *Server { + return &Server{ + dht: dht, + log: log, + } +} + +// Query is a node to node communication query +func (server *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { + rt, err := server.dht.GetRoutingTable(ctx) + if err != nil { + return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %server", err) + } + + if req.GetPingback() { + _, err = server.dht.Ping(ctx, *req.Sender) + if err != nil { + server.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String())) + err = rt.ConnectionFailed(req.Sender) + if err != nil { + server.log.Error("could not respond to connection failed", zap.Error(err)) + } + } else { + err = rt.ConnectionSuccess(req.Sender) + if err != nil { + server.log.Error("could not respond to connection success", zap.Error(err)) + } else { + count := atomic.AddInt32(&server.connected, 1) + if count == 1 { + server.log.Sugar().Debugf("Successfully connected with %s", req.Sender.Address.Address) + } else if count%100 == 0 { + server.log.Sugar().Debugf("Successfully connected with %s %dx times", req.Sender.Address.Address, count) + } + } + } + } + + nodes, err := rt.FindNear(req.Target.Id, int(req.Limit)) + if err != nil { + return &pb.QueryResponse{}, NodeClientErr.New("could not find near %server", err) + } + + return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil +} + +// Ping provides an easy way to verify a node is online and accepting requests +func (server *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { + //TODO + return &pb.PingResponse{}, nil +} diff --git a/pkg/piecestore/psserver/server.go b/pkg/piecestore/psserver/server.go index 0ce27b3db..ed926b7d4 100644 --- a/pkg/piecestore/psserver/server.go +++ b/pkg/piecestore/psserver/server.go @@ -370,7 +370,10 @@ func (s *Server) getDashboardData(ctx context.Context) (*pb.DashboardStats, erro return &pb.DashboardStats{}, ServerError.Wrap(err) } - rt := s.kad.GetRoutingTable() + rt, err := s.kad.GetRoutingTable(ctx) + if err != nil { + return &pb.DashboardStats{}, ServerError.Wrap(err) + } nodes, err := s.kad.FindNear(ctx, storj.NodeID{}, 10000000) if err != nil { diff --git a/satellite/peer.go b/satellite/peer.go index 2537413e8..8aa4128f2 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -29,6 +29,7 @@ import ( "storj.io/storj/pkg/discovery" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/pointerdb" @@ -115,7 +116,7 @@ type Peer struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *kademlia.Endpoint + Endpoint *node.Server Inspector *kademlia.Inspector } @@ -238,7 +239,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (* return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) + peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/satellite/testing.go b/satellite/testing.go new file mode 100644 index 000000000..48ae4fd94 --- /dev/null +++ b/satellite/testing.go @@ -0,0 +1,17 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellite + +import ( + "storj.io/storj/pkg/node" +) + +// These methods are added to have same interface as in testplanet to make transition easier. + +// NewNodeClient creates a node client for this node +// TODO: this is temporary and only intended for tests +func (peer *Peer) NewNodeClient() (node.Client, error) { + // TODO: handle disconnect verification + return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) +} diff --git a/storagenode/peer.go b/storagenode/peer.go index 115daa558..278394d02 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -14,6 +14,7 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/piecestore/psserver" @@ -71,7 +72,7 @@ type Peer struct { Kademlia struct { RoutingTable *kademlia.RoutingTable Service *kademlia.Kademlia - Endpoint *kademlia.Endpoint + Endpoint *node.Server Inspector *kademlia.Inspector } @@ -146,7 +147,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P return nil, errs.Combine(err, peer.Close()) } - peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable) + peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service) pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint) peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity) diff --git a/storagenode/testing.go b/storagenode/testing.go new file mode 100644 index 000000000..3c825fb57 --- /dev/null +++ b/storagenode/testing.go @@ -0,0 +1,17 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package storagenode + +import ( + "storj.io/storj/pkg/node" +) + +// These methods are added to have same interface as in testplanet to make transition easier. + +// NewNodeClient creates a node client for this node +// TODO: this is temporary and only intended for tests +func (peer *Peer) NewNodeClient() (node.Client, error) { + // TODO: handle disconnect verification + return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service) +}