Remove node package and simplify DHT interface.
This commit is contained in:
parent
21a58d0ece
commit
03ec1ff92d
@ -14,7 +14,6 @@ import (
|
|||||||
|
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/kademlia"
|
"storj.io/storj/pkg/kademlia"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/server"
|
"storj.io/storj/pkg/server"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
@ -64,7 +63,7 @@ type Peer struct {
|
|||||||
Kademlia struct {
|
Kademlia struct {
|
||||||
RoutingTable *kademlia.RoutingTable
|
RoutingTable *kademlia.RoutingTable
|
||||||
Service *kademlia.Kademlia
|
Service *kademlia.Kademlia
|
||||||
Endpoint *node.Server
|
Endpoint *kademlia.Endpoint
|
||||||
Inspector *kademlia.Inspector
|
Inspector *kademlia.Inspector
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -129,7 +128,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
|||||||
return nil, errs.Combine(err, peer.Close())
|
return nil, errs.Combine(err, peer.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||||
|
|
||||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||||
|
@ -1,17 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package bootstrap
|
|
||||||
|
|
||||||
import (
|
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
)
|
|
||||||
|
|
||||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
|
||||||
|
|
||||||
// NewNodeClient creates a node client for this node
|
|
||||||
// TODO: this is temporary and only intended for tests
|
|
||||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
|
||||||
// TODO: handle disconnect verification
|
|
||||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
|
||||||
}
|
|
@ -34,7 +34,6 @@ import (
|
|||||||
"storj.io/storj/pkg/discovery"
|
"storj.io/storj/pkg/discovery"
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/kademlia"
|
"storj.io/storj/pkg/kademlia"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/overlay"
|
"storj.io/storj/pkg/overlay"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/peertls"
|
"storj.io/storj/pkg/peertls"
|
||||||
@ -57,8 +56,6 @@ type Peer interface {
|
|||||||
|
|
||||||
Run(context.Context) error
|
Run(context.Context) error
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
NewNodeClient() (node.Client, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config describes planet configuration
|
// Config describes planet configuration
|
||||||
|
@ -42,20 +42,21 @@ func TestBasic(t *testing.T) {
|
|||||||
message := client.SignedMessage()
|
message := client.SignedMessage()
|
||||||
t.Log(message)
|
t.Log(message)
|
||||||
|
|
||||||
nodeClient, err := planet.StorageNodes[0].NewNodeClient()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// ping a satellite
|
// ping a satellite
|
||||||
_, err = nodeClient.Ping(context.Background(), planet.Satellites[0].Local())
|
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// ping a storage node
|
// ping a storage node
|
||||||
_, err = nodeClient.Ping(context.Background(), planet.StorageNodes[1].Local())
|
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = planet.StopPeer(planet.StorageNodes[0])
|
err = planet.StopPeer(planet.StorageNodes[1])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// ping a stopped storage node
|
||||||
|
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
// wait a bit to see whether some failures occur
|
// wait a bit to see whether some failures occur
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@ import (
|
|||||||
// DHT is the interface for the DHT in the Storj network
|
// DHT is the interface for the DHT in the Storj network
|
||||||
type DHT interface {
|
type DHT interface {
|
||||||
FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error)
|
FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error)
|
||||||
GetRoutingTable(ctx context.Context) (RoutingTable, error)
|
GetRoutingTable() RoutingTable
|
||||||
Bootstrap(ctx context.Context) error
|
Bootstrap(ctx context.Context) error
|
||||||
Ping(ctx context.Context, node pb.Node) (pb.Node, error)
|
Ping(ctx context.Context, node pb.Node) (pb.Node, error)
|
||||||
FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error)
|
FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error)
|
||||||
|
77
pkg/kademlia/endpoint.go
Normal file
77
pkg/kademlia/endpoint.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
// Copyright (C) 2019 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package kademlia
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/storj/pkg/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var EndpointError = errs.Class("kademlia endpoint error")
|
||||||
|
|
||||||
|
// Endpoint implements the kademlia Endpoints
|
||||||
|
type Endpoint struct {
|
||||||
|
log *zap.Logger
|
||||||
|
service *Kademlia
|
||||||
|
routingTable *RoutingTable
|
||||||
|
connected int32
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEndpoint returns a new kademlia endpoint
|
||||||
|
func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable) *Endpoint {
|
||||||
|
return &Endpoint{
|
||||||
|
service: service,
|
||||||
|
routingTable: routingTable,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query is a node to node communication query
|
||||||
|
func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
|
||||||
|
if req.GetPingback() {
|
||||||
|
endpoint.pingback(ctx, req.Sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := endpoint.routingTable.FindNear(req.Target.Id, int(req.Limit))
|
||||||
|
if err != nil {
|
||||||
|
return &pb.QueryResponse{}, EndpointError.New("could not find near endpoint: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// pingback implements pingback for queries
|
||||||
|
func (endpoint *Endpoint) pingback(ctx context.Context, target *pb.Node) {
|
||||||
|
_, err := endpoint.service.Ping(ctx, *target)
|
||||||
|
if err != nil {
|
||||||
|
endpoint.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", target.Id.String()))
|
||||||
|
err = endpoint.routingTable.ConnectionFailed(target)
|
||||||
|
if err != nil {
|
||||||
|
endpoint.log.Error("could not respond to connection failed", zap.Error(err))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = endpoint.routingTable.ConnectionSuccess(target)
|
||||||
|
if err != nil {
|
||||||
|
endpoint.log.Error("could not respond to connection success", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
count := atomic.AddInt32(&endpoint.connected, 1)
|
||||||
|
if count == 1 {
|
||||||
|
endpoint.log.Sugar().Debugf("Successfully connected with %s", target.Address.Address)
|
||||||
|
} else if count%100 == 0 {
|
||||||
|
endpoint.log.Sugar().Debugf("Successfully connected with %s %dx times", target.Address.Address, count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping provides an easy way to verify a node is online and accepting requests
|
||||||
|
func (endpoint *Endpoint) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
|
||||||
|
//TODO
|
||||||
|
return &pb.PingResponse{}, nil
|
||||||
|
}
|
@ -6,21 +6,19 @@ package kademlia
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"storj.io/storj/pkg/dht"
|
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Inspector is a gRPC service for inspecting kademlia internals
|
// Inspector is a gRPC service for inspecting kademlia internals
|
||||||
type Inspector struct {
|
type Inspector struct {
|
||||||
dht dht.DHT
|
dht *Kademlia
|
||||||
identity *identity.FullIdentity
|
identity *identity.FullIdentity
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInspector creates an Inspector
|
// NewInspector creates an Inspector
|
||||||
func NewInspector(kad dht.DHT, identity *identity.FullIdentity) *Inspector {
|
func NewInspector(kad *Kademlia, identity *identity.FullIdentity) *Inspector {
|
||||||
return &Inspector{
|
return &Inspector{
|
||||||
dht: kad,
|
dht: kad,
|
||||||
identity: identity,
|
identity: identity,
|
||||||
@ -42,10 +40,7 @@ func (srv *Inspector) CountNodes(ctx context.Context, req *pb.CountNodesRequest)
|
|||||||
|
|
||||||
// GetBuckets returns all kademlia buckets for current kademlia instance
|
// GetBuckets returns all kademlia buckets for current kademlia instance
|
||||||
func (srv *Inspector) GetBuckets(ctx context.Context, req *pb.GetBucketsRequest) (*pb.GetBucketsResponse, error) {
|
func (srv *Inspector) GetBuckets(ctx context.Context, req *pb.GetBucketsRequest) (*pb.GetBucketsResponse, error) {
|
||||||
rt, err := srv.dht.GetRoutingTable(ctx)
|
rt := srv.dht.GetRoutingTable()
|
||||||
if err != nil {
|
|
||||||
return &pb.GetBucketsResponse{}, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
b, err := rt.GetBucketIds()
|
b, err := rt.GetBucketIds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -77,31 +72,22 @@ func (srv *Inspector) FindNear(ctx context.Context, req *pb.FindNearRequest) (*p
|
|||||||
|
|
||||||
// PingNode sends a PING RPC to the provided node ID in the Kad network.
|
// PingNode sends a PING RPC to the provided node ID in the Kad network.
|
||||||
func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) {
|
func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) {
|
||||||
rt, err := srv.dht.GetRoutingTable(ctx)
|
rt := srv.dht.GetRoutingTable()
|
||||||
if err != nil {
|
|
||||||
return &pb.PingNodeResponse{}, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
self := rt.Local()
|
self := rt.Local()
|
||||||
|
|
||||||
nc, err := node.NewNodeClient(srv.identity, self, srv.dht)
|
_, err := srv.dht.Ping(ctx, pb.Node{
|
||||||
if err != nil {
|
|
||||||
return &pb.PingNodeResponse{}, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
p, err := nc.Ping(ctx, pb.Node{
|
|
||||||
Id: req.Id,
|
Id: req.Id,
|
||||||
Type: self.Type,
|
Type: self.Type,
|
||||||
Address: &pb.NodeAddress{
|
Address: &pb.NodeAddress{
|
||||||
Address: req.Address,
|
Address: req.Address,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
res := &pb.PingNodeResponse{Ok: p}
|
|
||||||
|
res := &pb.PingNodeResponse{Ok: err == nil}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, Error.Wrap(err)
|
return res, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,10 +119,8 @@ func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int,
|
|||||||
return nodes, nil
|
return nodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRoutingTable provides the routing table for the Kademlia DHT
|
// GetRoutingTable provides the assigned routing table
|
||||||
func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) {
|
func (k *Kademlia) GetRoutingTable() dht.RoutingTable { return k.routingTable }
|
||||||
return k.routingTable, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBootstrapNodes sets the bootstrap nodes.
|
// SetBootstrapNodes sets the bootstrap nodes.
|
||||||
// Must be called before anything starting to use kademlia.
|
// Must be called before anything starting to use kademlia.
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"storj.io/storj/internal/testidentity"
|
"storj.io/storj/internal/testidentity"
|
||||||
"storj.io/storj/internal/teststorj"
|
"storj.io/storj/internal/teststorj"
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/storage/teststore"
|
"storj.io/storj/storage/teststore"
|
||||||
@ -104,8 +103,7 @@ func TestPeerDiscovery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
|
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
rt, err := k.GetRoutingTable(ctx)
|
rt := k.GetRoutingTable()
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, rt.Local().Metadata.Email, "foo@bar.com")
|
assert.Equal(t, rt.Local().Metadata.Email, "foo@bar.com")
|
||||||
assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet")
|
assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet")
|
||||||
|
|
||||||
@ -176,7 +174,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
|
|||||||
logger := zaptest.NewLogger(t)
|
logger := zaptest.NewLogger(t)
|
||||||
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
s := node.NewServer(logger, k)
|
s := NewEndpoint(logger, k, k.GetRoutingTable().(*RoutingTable))
|
||||||
// new ident opts
|
// new ident opts
|
||||||
identOpt, err := fid.ServerOption()
|
identOpt, err := fid.ServerOption()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
@ -1,38 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information
|
|
||||||
|
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/dht"
|
|
||||||
"storj.io/storj/pkg/identity"
|
|
||||||
"storj.io/storj/pkg/pb"
|
|
||||||
"storj.io/storj/pkg/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
//NodeClientErr is the class for all errors pertaining to node client operations
|
|
||||||
var NodeClientErr = errs.Class("node client error")
|
|
||||||
|
|
||||||
// NewNodeClient instantiates a node client
|
|
||||||
func NewNodeClient(identity *identity.FullIdentity, self pb.Node, dht dht.DHT, obs ...transport.Observer) (Client, error) {
|
|
||||||
node := &Node{
|
|
||||||
dht: dht,
|
|
||||||
self: self,
|
|
||||||
pool: NewConnectionPool(identity, obs...),
|
|
||||||
}
|
|
||||||
|
|
||||||
node.pool.Init()
|
|
||||||
|
|
||||||
return node, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client is the Node client communication interface
|
|
||||||
type Client interface {
|
|
||||||
Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error)
|
|
||||||
Ping(ctx context.Context, to pb.Node) (bool, error)
|
|
||||||
Disconnect() error
|
|
||||||
}
|
|
@ -1,147 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information
|
|
||||||
|
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"unsafe"
|
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/identity"
|
|
||||||
"storj.io/storj/pkg/pb"
|
|
||||||
"storj.io/storj/pkg/storj"
|
|
||||||
"storj.io/storj/pkg/transport"
|
|
||||||
"storj.io/storj/pkg/utils"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Error defines a connection pool error
|
|
||||||
var Error = errs.Class("connection pool error")
|
|
||||||
|
|
||||||
// ConnectionPool is the in memory pool of node connections
|
|
||||||
type ConnectionPool struct {
|
|
||||||
tc transport.Client
|
|
||||||
mu sync.RWMutex
|
|
||||||
items map[storj.NodeID]*Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Conn is the connection that is stored in the connection pool
|
|
||||||
type Conn struct {
|
|
||||||
addr string
|
|
||||||
|
|
||||||
dial sync.Once
|
|
||||||
client pb.NodesClient
|
|
||||||
grpc unsafe.Pointer //*grpc.ClientConn
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewConn intitalizes a new Conn struct with the provided address, but does not iniate a connection
|
|
||||||
func NewConn(addr string) *Conn { return &Conn{addr: addr} }
|
|
||||||
|
|
||||||
// NewConnectionPool initializes a new in memory pool
|
|
||||||
func NewConnectionPool(identity *identity.FullIdentity, obs ...transport.Observer) *ConnectionPool {
|
|
||||||
return &ConnectionPool{
|
|
||||||
tc: transport.NewClient(identity, obs...),
|
|
||||||
items: make(map[storj.NodeID]*Conn),
|
|
||||||
mu: sync.RWMutex{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a node connection with the provided nodeID
|
|
||||||
// nil is returned if the NodeID is not in the connection pool
|
|
||||||
func (pool *ConnectionPool) Get(id storj.NodeID) (interface{}, error) {
|
|
||||||
pool.mu.Lock()
|
|
||||||
defer pool.mu.Unlock()
|
|
||||||
|
|
||||||
i, ok := pool.items[id]
|
|
||||||
if !ok {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disconnect deletes a connection associated with the provided NodeID
|
|
||||||
func (pool *ConnectionPool) Disconnect(id storj.NodeID) error {
|
|
||||||
pool.mu.Lock()
|
|
||||||
defer pool.mu.Unlock()
|
|
||||||
|
|
||||||
return pool.disconnect(id)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pool *ConnectionPool) disconnect(id storj.NodeID) error {
|
|
||||||
conn, ok := pool.items[id]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ptr := atomic.LoadPointer(&conn.grpc)
|
|
||||||
if ptr == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(pool.items, id)
|
|
||||||
|
|
||||||
return (*grpc.ClientConn)(ptr).Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dial connects to the node with the given ID and Address returning a gRPC Node Client
|
|
||||||
func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClient, error) {
|
|
||||||
id := n.Id
|
|
||||||
pool.mu.Lock()
|
|
||||||
conn, ok := pool.items[id]
|
|
||||||
if !ok {
|
|
||||||
conn = NewConn(n.GetAddress().Address)
|
|
||||||
pool.items[id] = conn
|
|
||||||
}
|
|
||||||
pool.mu.Unlock()
|
|
||||||
|
|
||||||
if n != nil {
|
|
||||||
n.Type.DPanicOnInvalid("connection pool dial")
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.dial.Do(func() {
|
|
||||||
grpc, err := pool.tc.DialNode(ctx, n)
|
|
||||||
conn.err = err
|
|
||||||
if conn.err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.StorePointer(&conn.grpc, unsafe.Pointer(grpc))
|
|
||||||
|
|
||||||
conn.client = pb.NewNodesClient(grpc)
|
|
||||||
})
|
|
||||||
|
|
||||||
if conn.err != nil {
|
|
||||||
return nil, conn.err
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn.client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DisconnectAll closes all connections nodes and removes them from the connection pool
|
|
||||||
func (pool *ConnectionPool) DisconnectAll() error {
|
|
||||||
pool.mu.Lock()
|
|
||||||
defer pool.mu.Unlock()
|
|
||||||
|
|
||||||
errs := []error{}
|
|
||||||
for k := range pool.items {
|
|
||||||
if err := pool.disconnect(k); err != nil {
|
|
||||||
errs = append(errs, Error.Wrap(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return utils.CombineErrors(errs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init initializes the cache
|
|
||||||
func (pool *ConnectionPool) Init() {
|
|
||||||
pool.mu.Lock()
|
|
||||||
pool.items = make(map[storj.NodeID]*Conn)
|
|
||||||
pool.mu.Unlock()
|
|
||||||
}
|
|
@ -1,64 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information
|
|
||||||
|
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/dht"
|
|
||||||
"storj.io/storj/pkg/pb"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Node is the storj definition for a node in the network
|
|
||||||
type Node struct {
|
|
||||||
dht dht.DHT
|
|
||||||
self pb.Node
|
|
||||||
pool *ConnectionPool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lookup queries nodes looking for a particular node in the network
|
|
||||||
func (node *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) {
|
|
||||||
to.Type.DPanicOnInvalid("node Lookup")
|
|
||||||
conn, err := node.pool.Dial(ctx, &to)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, NodeClientErr.Wrap(err)
|
|
||||||
}
|
|
||||||
resp, err := conn.Query(ctx, &pb.QueryRequest{
|
|
||||||
Limit: 20,
|
|
||||||
Sender: &node.self,
|
|
||||||
Target: &find,
|
|
||||||
Pingback: true,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, NodeClientErr.Wrap(err)
|
|
||||||
}
|
|
||||||
rt, err := node.dht.GetRoutingTable(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, NodeClientErr.Wrap(err)
|
|
||||||
}
|
|
||||||
if err := rt.ConnectionSuccess(&to); err != nil {
|
|
||||||
return nil, NodeClientErr.Wrap(err)
|
|
||||||
}
|
|
||||||
return resp.Response, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ping attempts to establish a connection with a node to verify it is alive
|
|
||||||
func (node *Node) Ping(ctx context.Context, to pb.Node) (bool, error) {
|
|
||||||
to.Type.DPanicOnInvalid("node ping")
|
|
||||||
conn, err := node.pool.Dial(ctx, &to)
|
|
||||||
if err != nil {
|
|
||||||
return false, NodeClientErr.Wrap(err)
|
|
||||||
}
|
|
||||||
_, err = conn.Ping(ctx, &pb.PingRequest{})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disconnect closes all connections within the pool
|
|
||||||
func (node *Node) Disconnect() error {
|
|
||||||
return node.pool.DisconnectAll()
|
|
||||||
}
|
|
@ -1,142 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package node_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
|
|
||||||
"storj.io/storj/internal/testcontext"
|
|
||||||
"storj.io/storj/internal/testplanet"
|
|
||||||
"storj.io/storj/pkg/pb"
|
|
||||||
"storj.io/storj/pkg/storj"
|
|
||||||
"storj.io/storj/pkg/utils"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestClient(t *testing.T) {
|
|
||||||
ctx := testcontext.New(t)
|
|
||||||
defer ctx.Cleanup()
|
|
||||||
|
|
||||||
planet, err := testplanet.New(t, 1, 4, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer ctx.Check(planet.Shutdown)
|
|
||||||
|
|
||||||
planet.Start(ctx)
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
// TODO: also use satellites
|
|
||||||
peers := planet.StorageNodes
|
|
||||||
|
|
||||||
{ // Ping
|
|
||||||
client, err := planet.StorageNodes[0].NewNodeClient()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
defer ctx.Check(client.Disconnect)
|
|
||||||
|
|
||||||
var group errgroup.Group
|
|
||||||
|
|
||||||
for i := range peers {
|
|
||||||
peer := peers[i]
|
|
||||||
group.Go(func() error {
|
|
||||||
pinged, err := client.Ping(ctx, peer.Local())
|
|
||||||
var pingErr error
|
|
||||||
if !pinged {
|
|
||||||
pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID())
|
|
||||||
}
|
|
||||||
return utils.CombineErrors(pingErr, err)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
defer ctx.Check(group.Wait)
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // Lookup
|
|
||||||
client, err := planet.StorageNodes[1].NewNodeClient()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
defer ctx.Check(client.Disconnect)
|
|
||||||
|
|
||||||
var group errgroup.Group
|
|
||||||
|
|
||||||
for i := range peers {
|
|
||||||
peer := peers[i]
|
|
||||||
group.Go(func() error {
|
|
||||||
for _, target := range peers {
|
|
||||||
errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID())
|
|
||||||
peer.Local().Type.DPanicOnInvalid("test client peer")
|
|
||||||
target.Local().Type.DPanicOnInvalid("test client target")
|
|
||||||
results, err := client.Lookup(ctx, peer.Local(), target.Local())
|
|
||||||
if err != nil {
|
|
||||||
return utils.CombineErrors(errTag, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if containsResult(results, target.ID()) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// with small network we expect to return everything
|
|
||||||
if len(results) != planet.Size() {
|
|
||||||
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
defer ctx.Check(group.Wait)
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // Lookup
|
|
||||||
client, err := planet.StorageNodes[2].NewNodeClient()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
defer ctx.Check(client.Disconnect)
|
|
||||||
|
|
||||||
targets := []storj.NodeID{
|
|
||||||
{}, // empty target
|
|
||||||
{255}, // non-empty
|
|
||||||
}
|
|
||||||
|
|
||||||
var group errgroup.Group
|
|
||||||
|
|
||||||
for i := range targets {
|
|
||||||
target := targets[i]
|
|
||||||
for i := range peers {
|
|
||||||
peer := peers[i]
|
|
||||||
group.Go(func() error {
|
|
||||||
errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target)
|
|
||||||
peer.Local().Type.DPanicOnInvalid("peer info")
|
|
||||||
results, err := client.Lookup(ctx, peer.Local(), pb.Node{Id: target, Type: pb.NodeType_STORAGE})
|
|
||||||
if err != nil {
|
|
||||||
return utils.CombineErrors(errTag, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// with small network we expect to return everything
|
|
||||||
if len(results) != planet.Size() {
|
|
||||||
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
defer ctx.Check(group.Wait)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func containsResult(nodes []*pb.Node, target storj.NodeID) bool {
|
|
||||||
for _, node := range nodes {
|
|
||||||
if node.Id == target {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
@ -1,74 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/dht"
|
|
||||||
"storj.io/storj/pkg/pb"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Server implements the grpc Node Server
|
|
||||||
type Server struct {
|
|
||||||
dht dht.DHT
|
|
||||||
log *zap.Logger
|
|
||||||
|
|
||||||
connected int32
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewServer returns a newly instantiated Node Server
|
|
||||||
func NewServer(log *zap.Logger, dht dht.DHT) *Server {
|
|
||||||
return &Server{
|
|
||||||
dht: dht,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query is a node to node communication query
|
|
||||||
func (server *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
|
|
||||||
rt, err := server.dht.GetRoutingTable(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %server", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.GetPingback() {
|
|
||||||
_, err = server.dht.Ping(ctx, *req.Sender)
|
|
||||||
if err != nil {
|
|
||||||
server.log.Debug("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String()))
|
|
||||||
err = rt.ConnectionFailed(req.Sender)
|
|
||||||
if err != nil {
|
|
||||||
server.log.Error("could not respond to connection failed", zap.Error(err))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = rt.ConnectionSuccess(req.Sender)
|
|
||||||
if err != nil {
|
|
||||||
server.log.Error("could not respond to connection success", zap.Error(err))
|
|
||||||
} else {
|
|
||||||
count := atomic.AddInt32(&server.connected, 1)
|
|
||||||
if count == 1 {
|
|
||||||
server.log.Sugar().Debugf("Successfully connected with %s", req.Sender.Address.Address)
|
|
||||||
} else if count%100 == 0 {
|
|
||||||
server.log.Sugar().Debugf("Successfully connected with %s %dx times", req.Sender.Address.Address, count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes, err := rt.FindNear(req.Target.Id, int(req.Limit))
|
|
||||||
if err != nil {
|
|
||||||
return &pb.QueryResponse{}, NodeClientErr.New("could not find near %server", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ping provides an easy way to verify a node is online and accepting requests
|
|
||||||
func (server *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
|
|
||||||
//TODO
|
|
||||||
return &pb.PingResponse{}, nil
|
|
||||||
}
|
|
@ -370,10 +370,7 @@ func (s *Server) getDashboardData(ctx context.Context) (*pb.DashboardStats, erro
|
|||||||
return &pb.DashboardStats{}, ServerError.Wrap(err)
|
return &pb.DashboardStats{}, ServerError.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rt, err := s.kad.GetRoutingTable(ctx)
|
rt := s.kad.GetRoutingTable()
|
||||||
if err != nil {
|
|
||||||
return &pb.DashboardStats{}, ServerError.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes, err := s.kad.FindNear(ctx, storj.NodeID{}, 10000000)
|
nodes, err := s.kad.FindNear(ctx, storj.NodeID{}, 10000000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -29,7 +29,6 @@ import (
|
|||||||
"storj.io/storj/pkg/discovery"
|
"storj.io/storj/pkg/discovery"
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/kademlia"
|
"storj.io/storj/pkg/kademlia"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/overlay"
|
"storj.io/storj/pkg/overlay"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/pointerdb"
|
"storj.io/storj/pkg/pointerdb"
|
||||||
@ -116,7 +115,7 @@ type Peer struct {
|
|||||||
|
|
||||||
RoutingTable *kademlia.RoutingTable
|
RoutingTable *kademlia.RoutingTable
|
||||||
Service *kademlia.Kademlia
|
Service *kademlia.Kademlia
|
||||||
Endpoint *node.Server
|
Endpoint *kademlia.Endpoint
|
||||||
Inspector *kademlia.Inspector
|
Inspector *kademlia.Inspector
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,7 +238,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
|||||||
return nil, errs.Combine(err, peer.Close())
|
return nil, errs.Combine(err, peer.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||||
|
|
||||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||||
|
@ -1,17 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package satellite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
)
|
|
||||||
|
|
||||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
|
||||||
|
|
||||||
// NewNodeClient creates a node client for this node
|
|
||||||
// TODO: this is temporary and only intended for tests
|
|
||||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
|
||||||
// TODO: handle disconnect verification
|
|
||||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
|
||||||
}
|
|
@ -14,7 +14,6 @@ import (
|
|||||||
|
|
||||||
"storj.io/storj/pkg/identity"
|
"storj.io/storj/pkg/identity"
|
||||||
"storj.io/storj/pkg/kademlia"
|
"storj.io/storj/pkg/kademlia"
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
pstore "storj.io/storj/pkg/piecestore"
|
pstore "storj.io/storj/pkg/piecestore"
|
||||||
"storj.io/storj/pkg/piecestore/psserver"
|
"storj.io/storj/pkg/piecestore/psserver"
|
||||||
@ -72,7 +71,7 @@ type Peer struct {
|
|||||||
Kademlia struct {
|
Kademlia struct {
|
||||||
RoutingTable *kademlia.RoutingTable
|
RoutingTable *kademlia.RoutingTable
|
||||||
Service *kademlia.Kademlia
|
Service *kademlia.Kademlia
|
||||||
Endpoint *node.Server
|
Endpoint *kademlia.Endpoint
|
||||||
Inspector *kademlia.Inspector
|
Inspector *kademlia.Inspector
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,7 +146,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
|||||||
return nil, errs.Combine(err, peer.Close())
|
return nil, errs.Combine(err, peer.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.Kademlia.Endpoint = node.NewServer(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service)
|
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
|
||||||
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
pb.RegisterNodesServer(peer.Public.Server.GRPC(), peer.Kademlia.Endpoint)
|
||||||
|
|
||||||
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)
|
||||||
|
@ -1,17 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package storagenode
|
|
||||||
|
|
||||||
import (
|
|
||||||
"storj.io/storj/pkg/node"
|
|
||||||
)
|
|
||||||
|
|
||||||
// These methods are added to have same interface as in testplanet to make transition easier.
|
|
||||||
|
|
||||||
// NewNodeClient creates a node client for this node
|
|
||||||
// TODO: this is temporary and only intended for tests
|
|
||||||
func (peer *Peer) NewNodeClient() (node.Client, error) {
|
|
||||||
// TODO: handle disconnect verification
|
|
||||||
return node.NewNodeClient(peer.Identity, peer.Local(), peer.Kademlia.Service)
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user