1ec17653d4
Adds a new `Info` method to the Kademlia endpoint that returns the following local node info: * ID * Type * Metadata (email and wallet) * Restrictions (free storage and bandwidth) The new endpoint is exposed as `inspector kad node-info` command too.
159 lines
3.9 KiB
Go
159 lines
3.9 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package kademlia
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/peer"
|
|
|
|
"storj.io/storj/internal/sync2"
|
|
"storj.io/storj/pkg/identity"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/transport"
|
|
)
|
|
|
|
// Dialer is a kademlia dialer
|
|
type Dialer struct {
|
|
log *zap.Logger
|
|
transport transport.Client
|
|
limit sync2.Semaphore
|
|
}
|
|
|
|
// Conn represents a kademlia connection
|
|
type Conn struct {
|
|
conn *grpc.ClientConn
|
|
client pb.NodesClient
|
|
}
|
|
|
|
// NewDialer creates a dialer for kademlia.
|
|
func NewDialer(log *zap.Logger, transport transport.Client) *Dialer {
|
|
dialer := &Dialer{
|
|
log: log,
|
|
transport: transport,
|
|
}
|
|
dialer.limit.Init(32) // TODO: limit should not be hardcoded
|
|
return dialer
|
|
}
|
|
|
|
// Close closes the pool resources and prevents new connections to be made.
|
|
func (dialer *Dialer) Close() error {
|
|
dialer.limit.Close()
|
|
return nil
|
|
}
|
|
|
|
// Lookup queries ask about find, and also sends information about self.
|
|
func (dialer *Dialer) Lookup(ctx context.Context, self pb.Node, ask pb.Node, find pb.Node) ([]*pb.Node, error) {
|
|
if !dialer.limit.Lock() {
|
|
return nil, context.Canceled
|
|
}
|
|
defer dialer.limit.Unlock()
|
|
|
|
conn, err := dialer.dial(ctx, ask)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := conn.client.Query(ctx, &pb.QueryRequest{
|
|
Limit: 20, // TODO: should not be hardcoded, but instead kademlia k value, routing table depth, etc
|
|
Sender: &self,
|
|
Target: &find,
|
|
Pingback: true, // should only be true during bucket refreshing
|
|
})
|
|
if err != nil {
|
|
return nil, errs.Combine(err, conn.disconnect())
|
|
}
|
|
|
|
return resp.Response, conn.disconnect()
|
|
}
|
|
|
|
// Ping pings target.
|
|
func (dialer *Dialer) Ping(ctx context.Context, target pb.Node) (bool, error) {
|
|
if !dialer.limit.Lock() {
|
|
return false, context.Canceled
|
|
}
|
|
defer dialer.limit.Unlock()
|
|
|
|
conn, err := dialer.dial(ctx, target)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
_, err = conn.client.Ping(ctx, &pb.PingRequest{})
|
|
|
|
return err == nil, errs.Combine(err, conn.disconnect())
|
|
}
|
|
|
|
// FetchPeerIdentity connects to a node and returns its peer identity
|
|
func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pID *identity.PeerIdentity, err error) {
|
|
if !dialer.limit.Lock() {
|
|
return nil, context.Canceled
|
|
}
|
|
defer dialer.limit.Unlock()
|
|
|
|
conn, err := dialer.dial(ctx, target)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, conn.disconnect())
|
|
}()
|
|
|
|
p := &peer.Peer{}
|
|
pCall := grpc.Peer(p)
|
|
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, pCall)
|
|
return identity.PeerIdentityFromPeer(p)
|
|
}
|
|
|
|
// FetchInfo connects to a node address and returns its node info.
|
|
func (dialer *Dialer) FetchInfo(ctx context.Context, address *pb.NodeAddress) (*identity.PeerIdentity, *pb.InfoResponse, error) {
|
|
if !dialer.limit.Lock() {
|
|
return nil, nil, context.Canceled
|
|
}
|
|
defer dialer.limit.Unlock()
|
|
|
|
conn, err := dialer.dialAddress(ctx, address)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
p := &peer.Peer{}
|
|
pCall := grpc.Peer(p)
|
|
|
|
resp, err := conn.client.RequestInfo(ctx, &pb.InfoRequest{}, pCall)
|
|
if err != nil {
|
|
return nil, nil, errs.Combine(err, conn.disconnect())
|
|
}
|
|
|
|
id, err := identity.PeerIdentityFromPeer(p)
|
|
|
|
return id, resp, errs.Combine(err, conn.disconnect())
|
|
}
|
|
|
|
// dial dials the specified node.
|
|
func (dialer *Dialer) dial(ctx context.Context, target pb.Node) (*Conn, error) {
|
|
grpcconn, err := dialer.transport.DialNode(ctx, &target)
|
|
return &Conn{
|
|
conn: grpcconn,
|
|
client: pb.NewNodesClient(grpcconn),
|
|
}, err
|
|
}
|
|
|
|
// dialAddress dials the specified address.
|
|
func (dialer *Dialer) dialAddress(ctx context.Context, address *pb.NodeAddress) (*Conn, error) {
|
|
grpcconn, err := dialer.transport.DialAddress(ctx, address.GetAddress())
|
|
return &Conn{
|
|
conn: grpcconn,
|
|
client: pb.NewNodesClient(grpcconn),
|
|
}, err
|
|
}
|
|
|
|
// disconnect disconnects this connection.
|
|
func (conn *Conn) disconnect() error {
|
|
return conn.conn.Close()
|
|
}
|