Revert "Remove node package and simplify DHT interface."
This reverts commit 03ec1ff92d
.
This commit is contained in:
parent
03ec1ff92d
commit
87d6410b50
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -63,7 +64,7 @@ type Peer struct {
|
||||
Kademlia struct {
|
||||
RoutingTable *kademlia.RoutingTable
|
||||
Service *kademlia.Kademlia
|
||||
Endpoint *kademlia.Endpoint
|
||||
Endpoint *node.Server
|
||||
Inspector *kademlia.Inspector
|
||||
}
|
||||
}
|
||||
@ -128,7 +129,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||
|
||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||
|
17
bootstrap/testing.go
Normal file
17
bootstrap/testing.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bootstrap
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/node"
|
||||
)
|
||||
|
||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
||||
|
||||
// NewNodeClient creates a node client for this node
|
||||
// TODO: this is temporary and only intended for tests
|
||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
||||
// TODO: handle disconnect verification
|
||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
||||
}
|
@ -34,6 +34,7 @@ import (
|
||||
"storj.io/storj/pkg/discovery"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
@ -56,6 +57,8 @@ type Peer interface {
|
||||
|
||||
Run(context.Context) error
|
||||
Close() error
|
||||
|
||||
NewNodeClient() (node.Client, error)
|
||||
}
|
||||
|
||||
// Config describes planet configuration
|
||||
|
@ -42,21 +42,20 @@ func TestBasic(t *testing.T) {
|
||||
message := client.SignedMessage()
|
||||
t.Log(message)
|
||||
|
||||
nodeClient, err := planet.StorageNodes[0].NewNodeClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
// ping a satellite
|
||||
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local())
|
||||
_, err = nodeClient.Ping(context.Background(), planet.Satellites[0].Local())
|
||||
require.NoError(t, err)
|
||||
|
||||
// ping a storage node
|
||||
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
|
||||
_, err = nodeClient.Ping(context.Background(), planet.StorageNodes[1].Local())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = planet.StopPeer(planet.StorageNodes[1])
|
||||
err = planet.StopPeer(planet.StorageNodes[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
// ping a stopped storage node
|
||||
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
|
||||
require.Error(t, err)
|
||||
|
||||
// wait a bit to see whether some failures occur
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
// DHT is the interface for the DHT in the Storj network
|
||||
type DHT interface {
|
||||
FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error)
|
||||
GetRoutingTable() RoutingTable
|
||||
GetRoutingTable(ctx context.Context) (RoutingTable, error)
|
||||
Bootstrap(ctx context.Context) error
|
||||
Ping(ctx context.Context, node pb.Node) (pb.Node, error)
|
||||
FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error)
|
||||
|
@ -1,77 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package kademlia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
var EndpointError = errs.Class("kademlia endpoint error")
|
||||
|
||||
// Endpoint implements the kademlia Endpoints
|
||||
type Endpoint struct {
|
||||
log *zap.Logger
|
||||
service *Kademlia
|
||||
routingTable *RoutingTable
|
||||
connected int32
|
||||
}
|
||||
|
||||
// NewEndpoint returns a new kademlia endpoint
|
||||
func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable) *Endpoint {
|
||||
return &Endpoint{
|
||||
service: service,
|
||||
routingTable: routingTable,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Query is a node to node communication query
|
||||
func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
|
||||
if req.GetPingback() {
|
||||
endpoint.pingback(ctx, req.Sender)
|
||||
}
|
||||
|
||||
nodes, err := endpoint.routingTable.FindNear(req.Target.Id, int(req.Limit))
|
||||
if err != nil {
|
||||
return &pb.QueryResponse{}, EndpointError.New("could not find near endpoint: %v", err)
|
||||
}
|
||||
|
||||
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
|
||||
}
|
||||
|
||||
// pingback implements pingback for queries
|
||||
func (endpoint *Endpoint) pingback(ctx context.Context, target *pb.Node) {
|
||||
_, err := endpoint.service.Ping(ctx, *target)
|
||||
if err != nil {
|
||||
endpoint.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", target.Id.String()))
|
||||
err = endpoint.routingTable.ConnectionFailed(target)
|
||||
if err != nil {
|
||||
endpoint.log.Error("could not respond to connection failed", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
err = endpoint.routingTable.ConnectionSuccess(target)
|
||||
if err != nil {
|
||||
endpoint.log.Error("could not respond to connection success", zap.Error(err))
|
||||
} else {
|
||||
count := atomic.AddInt32(&endpoint.connected, 1)
|
||||
if count == 1 {
|
||||
endpoint.log.Sugar().Debugf("Successfully connected with %s", target.Address.Address)
|
||||
} else if count%100 == 0 {
|
||||
endpoint.log.Sugar().Debugf("Successfully connected with %s %dx times", target.Address.Address, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ping provides an easy way to verify a node is online and accepting requests
|
||||
func (endpoint *Endpoint) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
|
||||
//TODO
|
||||
return &pb.PingResponse{}, nil
|
||||
}
|
@ -6,19 +6,21 @@ package kademlia
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// Inspector is a gRPC service for inspecting kademlia internals
|
||||
type Inspector struct {
|
||||
dht *Kademlia
|
||||
dht dht.DHT
|
||||
identity *identity.FullIdentity
|
||||
}
|
||||
|
||||
// NewInspector creates an Inspector
|
||||
func NewInspector(kad *Kademlia, identity *identity.FullIdentity) *Inspector {
|
||||
func NewInspector(kad dht.DHT, identity *identity.FullIdentity) *Inspector {
|
||||
return &Inspector{
|
||||
dht: kad,
|
||||
identity: identity,
|
||||
@ -40,7 +42,10 @@ func (srv *Inspector) CountNodes(ctx context.Context, req *pb.CountNodesRequest)
|
||||
|
||||
// GetBuckets returns all kademlia buckets for current kademlia instance
|
||||
func (srv *Inspector) GetBuckets(ctx context.Context, req *pb.GetBucketsRequest) (*pb.GetBucketsResponse, error) {
|
||||
rt := srv.dht.GetRoutingTable()
|
||||
rt, err := srv.dht.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return &pb.GetBucketsResponse{}, Error.Wrap(err)
|
||||
}
|
||||
b, err := rt.GetBucketIds()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -72,22 +77,31 @@ func (srv *Inspector) FindNear(ctx context.Context, req *pb.FindNearRequest) (*p
|
||||
|
||||
// PingNode sends a PING RPC to the provided node ID in the Kad network.
|
||||
func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) {
|
||||
rt := srv.dht.GetRoutingTable()
|
||||
rt, err := srv.dht.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return &pb.PingNodeResponse{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
self := rt.Local()
|
||||
|
||||
_, err := srv.dht.Ping(ctx, pb.Node{
|
||||
nc, err := node.NewNodeClient(srv.identity, self, srv.dht)
|
||||
if err != nil {
|
||||
return &pb.PingNodeResponse{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
p, err := nc.Ping(ctx, pb.Node{
|
||||
Id: req.Id,
|
||||
Type: self.Type,
|
||||
Address: &pb.NodeAddress{
|
||||
Address: req.Address,
|
||||
},
|
||||
})
|
||||
|
||||
res := &pb.PingNodeResponse{Ok: err == nil}
|
||||
res := &pb.PingNodeResponse{Ok: p}
|
||||
|
||||
if err != nil {
|
||||
return res, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
@ -119,8 +119,10 @@ func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int,
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// GetRoutingTable provides the assigned routing table
|
||||
func (k *Kademlia) GetRoutingTable() dht.RoutingTable { return k.routingTable }
|
||||
// GetRoutingTable provides the routing table for the Kademlia DHT
|
||||
func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) {
|
||||
return k.routingTable, nil
|
||||
}
|
||||
|
||||
// SetBootstrapNodes sets the bootstrap nodes.
|
||||
// Must be called before anything starting to use kademlia.
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storage/teststore"
|
||||
@ -103,7 +104,8 @@ func TestPeerDiscovery(t *testing.T) {
|
||||
}
|
||||
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
rt := k.GetRoutingTable()
|
||||
rt, err := k.GetRoutingTable(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, rt.Local().Metadata.Email, "foo@bar.com")
|
||||
assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet")
|
||||
|
||||
@ -174,7 +176,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
|
||||
logger := zaptest.NewLogger(t)
|
||||
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
s := NewEndpoint(logger, k, k.GetRoutingTable().(*RoutingTable))
|
||||
s := node.NewServer(logger, k)
|
||||
// new ident opts
|
||||
identOpt, err := fid.ServerOption()
|
||||
assert.NoError(t, err)
|
||||
|
38
pkg/node/client.go
Normal file
38
pkg/node/client.go
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
//NodeClientErr is the class for all errors pertaining to node client operations
|
||||
var NodeClientErr = errs.Class("node client error")
|
||||
|
||||
// NewNodeClient instantiates a node client
|
||||
func NewNodeClient(identity *identity.FullIdentity, self pb.Node, dht dht.DHT, obs ...transport.Observer) (Client, error) {
|
||||
node := &Node{
|
||||
dht: dht,
|
||||
self: self,
|
||||
pool: NewConnectionPool(identity, obs...),
|
||||
}
|
||||
|
||||
node.pool.Init()
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// Client is the Node client communication interface
|
||||
type Client interface {
|
||||
Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error)
|
||||
Ping(ctx context.Context, to pb.Node) (bool, error)
|
||||
Disconnect() error
|
||||
}
|
147
pkg/node/connection_pool.go
Normal file
147
pkg/node/connection_pool.go
Normal file
@ -0,0 +1,147 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Error defines a connection pool error
|
||||
var Error = errs.Class("connection pool error")
|
||||
|
||||
// ConnectionPool is the in memory pool of node connections
|
||||
type ConnectionPool struct {
|
||||
tc transport.Client
|
||||
mu sync.RWMutex
|
||||
items map[storj.NodeID]*Conn
|
||||
}
|
||||
|
||||
// Conn is the connection that is stored in the connection pool
|
||||
type Conn struct {
|
||||
addr string
|
||||
|
||||
dial sync.Once
|
||||
client pb.NodesClient
|
||||
grpc unsafe.Pointer //*grpc.ClientConn
|
||||
err error
|
||||
}
|
||||
|
||||
// NewConn intitalizes a new Conn struct with the provided address, but does not iniate a connection
|
||||
func NewConn(addr string) *Conn { return &Conn{addr: addr} }
|
||||
|
||||
// NewConnectionPool initializes a new in memory pool
|
||||
func NewConnectionPool(identity *identity.FullIdentity, obs ...transport.Observer) *ConnectionPool {
|
||||
return &ConnectionPool{
|
||||
tc: transport.NewClient(identity, obs...),
|
||||
items: make(map[storj.NodeID]*Conn),
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a node connection with the provided nodeID
|
||||
// nil is returned if the NodeID is not in the connection pool
|
||||
func (pool *ConnectionPool) Get(id storj.NodeID) (interface{}, error) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
i, ok := pool.items[id]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// Disconnect deletes a connection associated with the provided NodeID
|
||||
func (pool *ConnectionPool) Disconnect(id storj.NodeID) error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
return pool.disconnect(id)
|
||||
|
||||
}
|
||||
|
||||
func (pool *ConnectionPool) disconnect(id storj.NodeID) error {
|
||||
conn, ok := pool.items[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ptr := atomic.LoadPointer(&conn.grpc)
|
||||
if ptr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(pool.items, id)
|
||||
|
||||
return (*grpc.ClientConn)(ptr).Close()
|
||||
}
|
||||
|
||||
// Dial connects to the node with the given ID and Address returning a gRPC Node Client
|
||||
func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClient, error) {
|
||||
id := n.Id
|
||||
pool.mu.Lock()
|
||||
conn, ok := pool.items[id]
|
||||
if !ok {
|
||||
conn = NewConn(n.GetAddress().Address)
|
||||
pool.items[id] = conn
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
|
||||
if n != nil {
|
||||
n.Type.DPanicOnInvalid("connection pool dial")
|
||||
}
|
||||
|
||||
conn.dial.Do(func() {
|
||||
grpc, err := pool.tc.DialNode(ctx, n)
|
||||
conn.err = err
|
||||
if conn.err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
atomic.StorePointer(&conn.grpc, unsafe.Pointer(grpc))
|
||||
|
||||
conn.client = pb.NewNodesClient(grpc)
|
||||
})
|
||||
|
||||
if conn.err != nil {
|
||||
return nil, conn.err
|
||||
}
|
||||
|
||||
return conn.client, nil
|
||||
}
|
||||
|
||||
// DisconnectAll closes all connections nodes and removes them from the connection pool
|
||||
func (pool *ConnectionPool) DisconnectAll() error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
errs := []error{}
|
||||
for k := range pool.items {
|
||||
if err := pool.disconnect(k); err != nil {
|
||||
errs = append(errs, Error.Wrap(err))
|
||||
}
|
||||
}
|
||||
|
||||
return utils.CombineErrors(errs...)
|
||||
}
|
||||
|
||||
// Init initializes the cache
|
||||
func (pool *ConnectionPool) Init() {
|
||||
pool.mu.Lock()
|
||||
pool.items = make(map[storj.NodeID]*Conn)
|
||||
pool.mu.Unlock()
|
||||
}
|
64
pkg/node/node.go
Normal file
64
pkg/node/node.go
Normal file
@ -0,0 +1,64 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
// Node is the storj definition for a node in the network
|
||||
type Node struct {
|
||||
dht dht.DHT
|
||||
self pb.Node
|
||||
pool *ConnectionPool
|
||||
}
|
||||
|
||||
// Lookup queries nodes looking for a particular node in the network
|
||||
func (node *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) {
|
||||
to.Type.DPanicOnInvalid("node Lookup")
|
||||
conn, err := node.pool.Dial(ctx, &to)
|
||||
|
||||
if err != nil {
|
||||
return nil, NodeClientErr.Wrap(err)
|
||||
}
|
||||
resp, err := conn.Query(ctx, &pb.QueryRequest{
|
||||
Limit: 20,
|
||||
Sender: &node.self,
|
||||
Target: &find,
|
||||
Pingback: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, NodeClientErr.Wrap(err)
|
||||
}
|
||||
rt, err := node.dht.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return nil, NodeClientErr.Wrap(err)
|
||||
}
|
||||
if err := rt.ConnectionSuccess(&to); err != nil {
|
||||
return nil, NodeClientErr.Wrap(err)
|
||||
}
|
||||
return resp.Response, nil
|
||||
}
|
||||
|
||||
// Ping attempts to establish a connection with a node to verify it is alive
|
||||
func (node *Node) Ping(ctx context.Context, to pb.Node) (bool, error) {
|
||||
to.Type.DPanicOnInvalid("node ping")
|
||||
conn, err := node.pool.Dial(ctx, &to)
|
||||
if err != nil {
|
||||
return false, NodeClientErr.Wrap(err)
|
||||
}
|
||||
_, err = conn.Ping(ctx, &pb.PingRequest{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Disconnect closes all connections within the pool
|
||||
func (node *Node) Disconnect() error {
|
||||
return node.pool.DisconnectAll()
|
||||
}
|
142
pkg/node/node_test.go
Normal file
142
pkg/node/node_test.go
Normal file
@ -0,0 +1,142 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package node_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// TODO: also use satellites
|
||||
peers := planet.StorageNodes
|
||||
|
||||
{ // Ping
|
||||
client, err := planet.StorageNodes[0].NewNodeClient()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(client.Disconnect)
|
||||
|
||||
var group errgroup.Group
|
||||
|
||||
for i := range peers {
|
||||
peer := peers[i]
|
||||
group.Go(func() error {
|
||||
pinged, err := client.Ping(ctx, peer.Local())
|
||||
var pingErr error
|
||||
if !pinged {
|
||||
pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID())
|
||||
}
|
||||
return utils.CombineErrors(pingErr, err)
|
||||
})
|
||||
}
|
||||
|
||||
defer ctx.Check(group.Wait)
|
||||
}
|
||||
|
||||
{ // Lookup
|
||||
client, err := planet.StorageNodes[1].NewNodeClient()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(client.Disconnect)
|
||||
|
||||
var group errgroup.Group
|
||||
|
||||
for i := range peers {
|
||||
peer := peers[i]
|
||||
group.Go(func() error {
|
||||
for _, target := range peers {
|
||||
errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID())
|
||||
peer.Local().Type.DPanicOnInvalid("test client peer")
|
||||
target.Local().Type.DPanicOnInvalid("test client target")
|
||||
results, err := client.Lookup(ctx, peer.Local(), target.Local())
|
||||
if err != nil {
|
||||
return utils.CombineErrors(errTag, err)
|
||||
}
|
||||
|
||||
if containsResult(results, target.ID()) {
|
||||
continue
|
||||
}
|
||||
|
||||
// with small network we expect to return everything
|
||||
if len(results) != planet.Size() {
|
||||
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
defer ctx.Check(group.Wait)
|
||||
}
|
||||
|
||||
{ // Lookup
|
||||
client, err := planet.StorageNodes[2].NewNodeClient()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(client.Disconnect)
|
||||
|
||||
targets := []storj.NodeID{
|
||||
{}, // empty target
|
||||
{255}, // non-empty
|
||||
}
|
||||
|
||||
var group errgroup.Group
|
||||
|
||||
for i := range targets {
|
||||
target := targets[i]
|
||||
for i := range peers {
|
||||
peer := peers[i]
|
||||
group.Go(func() error {
|
||||
errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target)
|
||||
peer.Local().Type.DPanicOnInvalid("peer info")
|
||||
results, err := client.Lookup(ctx, peer.Local(), pb.Node{Id: target, Type: pb.NodeType_STORAGE})
|
||||
if err != nil {
|
||||
return utils.CombineErrors(errTag, err)
|
||||
}
|
||||
|
||||
// with small network we expect to return everything
|
||||
if len(results) != planet.Size() {
|
||||
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
defer ctx.Check(group.Wait)
|
||||
}
|
||||
}
|
||||
|
||||
func containsResult(nodes []*pb.Node, target storj.NodeID) bool {
|
||||
for _, node := range nodes {
|
||||
if node.Id == target {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
74
pkg/node/server.go
Normal file
74
pkg/node/server.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
// Server implements the grpc Node Server
|
||||
type Server struct {
|
||||
dht dht.DHT
|
||||
log *zap.Logger
|
||||
|
||||
connected int32
|
||||
}
|
||||
|
||||
// NewServer returns a newly instantiated Node Server
|
||||
func NewServer(log *zap.Logger, dht dht.DHT) *Server {
|
||||
return &Server{
|
||||
dht: dht,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Query is a node to node communication query
|
||||
func (server *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
|
||||
rt, err := server.dht.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %server", err)
|
||||
}
|
||||
|
||||
if req.GetPingback() {
|
||||
_, err = server.dht.Ping(ctx, *req.Sender)
|
||||
if err != nil {
|
||||
server.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String()))
|
||||
err = rt.ConnectionFailed(req.Sender)
|
||||
if err != nil {
|
||||
server.log.Error("could not respond to connection failed", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
err = rt.ConnectionSuccess(req.Sender)
|
||||
if err != nil {
|
||||
server.log.Error("could not respond to connection success", zap.Error(err))
|
||||
} else {
|
||||
count := atomic.AddInt32(&server.connected, 1)
|
||||
if count == 1 {
|
||||
server.log.Sugar().Debugf("Successfully connected with %s", req.Sender.Address.Address)
|
||||
} else if count%100 == 0 {
|
||||
server.log.Sugar().Debugf("Successfully connected with %s %dx times", req.Sender.Address.Address, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes, err := rt.FindNear(req.Target.Id, int(req.Limit))
|
||||
if err != nil {
|
||||
return &pb.QueryResponse{}, NodeClientErr.New("could not find near %server", err)
|
||||
}
|
||||
|
||||
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
|
||||
}
|
||||
|
||||
// Ping provides an easy way to verify a node is online and accepting requests
|
||||
func (server *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
|
||||
//TODO
|
||||
return &pb.PingResponse{}, nil
|
||||
}
|
@ -370,7 +370,10 @@ func (s *Server) getDashboardData(ctx context.Context) (*pb.DashboardStats, erro
|
||||
return &pb.DashboardStats{}, ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
rt := s.kad.GetRoutingTable()
|
||||
rt, err := s.kad.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return &pb.DashboardStats{}, ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
nodes, err := s.kad.FindNear(ctx, storj.NodeID{}, 10000000)
|
||||
if err != nil {
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"storj.io/storj/pkg/discovery"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
@ -115,7 +116,7 @@ type Peer struct {
|
||||
|
||||
RoutingTable *kademlia.RoutingTable
|
||||
Service *kademlia.Kademlia
|
||||
Endpoint *kademlia.Endpoint
|
||||
Endpoint *node.Server
|
||||
Inspector *kademlia.Inspector
|
||||
}
|
||||
|
||||
@ -238,7 +239,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||
|
||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||
|
17
satellite/testing.go
Normal file
17
satellite/testing.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package satellite
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/node"
|
||||
)
|
||||
|
||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
||||
|
||||
// NewNodeClient creates a node client for this node
|
||||
// TODO: this is temporary and only intended for tests
|
||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
||||
// TODO: handle disconnect verification
|
||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
||||
}
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/piecestore/psserver"
|
||||
@ -71,7 +72,7 @@ type Peer struct {
|
||||
Kademlia struct {
|
||||
RoutingTable *kademlia.RoutingTable
|
||||
Service *kademlia.Kademlia
|
||||
Endpoint *kademlia.Endpoint
|
||||
Endpoint *node.Server
|
||||
Inspector *kademlia.Inspector
|
||||
}
|
||||
|
||||
@ -146,7 +147,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||
|
||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||
|
17
storagenode/testing.go
Normal file
17
storagenode/testing.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package storagenode
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/node"
|
||||
)
|
||||
|
||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
||||
|
||||
// NewNodeClient creates a node client for this node
|
||||
// TODO: this is temporary and only intended for tests
|
||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
||||
// TODO: handle disconnect verification
|
||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
||||
}
|
Loading…
Reference in New Issue
Block a user