2019-02-06 12:37:17 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package kademlia
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"sync/atomic"
|
2019-09-11 21:41:43 +01:00
|
|
|
"time"
|
2019-02-06 12:37:17 +00:00
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
2019-08-09 10:21:41 +01:00
|
|
|
"google.golang.org/grpc/codes"
|
2019-09-11 21:41:43 +01:00
|
|
|
"google.golang.org/grpc/peer"
|
2019-08-09 10:21:41 +01:00
|
|
|
"google.golang.org/grpc/status"
|
2019-02-06 12:37:17 +00:00
|
|
|
|
2019-08-09 10:21:41 +01:00
|
|
|
"storj.io/storj/pkg/identity"
|
2019-02-06 12:37:17 +00:00
|
|
|
"storj.io/storj/pkg/pb"
|
2019-08-09 10:21:41 +01:00
|
|
|
"storj.io/storj/pkg/storj"
|
2019-02-06 12:37:17 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// EndpointError defines errors class for Endpoint
|
|
|
|
var EndpointError = errs.Class("kademlia endpoint error")
|
|
|
|
|
2019-08-09 10:21:41 +01:00
|
|
|
// SatelliteIDVerifier checks if the connection is from a trusted satellite
|
|
|
|
type SatelliteIDVerifier interface {
|
|
|
|
VerifySatelliteID(ctx context.Context, id storj.NodeID) error
|
|
|
|
}
|
|
|
|
|
2019-09-11 21:41:43 +01:00
|
|
|
type pingStatsSource interface {
|
|
|
|
WasPinged(when time.Time, byID storj.NodeID, byAddr string)
|
|
|
|
}
|
|
|
|
|
2019-02-06 12:37:17 +00:00
|
|
|
// Endpoint implements the kademlia Endpoints
|
|
|
|
type Endpoint struct {
|
2019-03-22 20:06:57 +00:00
|
|
|
log *zap.Logger
|
|
|
|
service *Kademlia
|
2019-09-11 21:41:43 +01:00
|
|
|
pingStats pingStatsSource
|
2019-03-22 20:06:57 +00:00
|
|
|
routingTable *RoutingTable
|
2019-08-09 10:21:41 +01:00
|
|
|
trust SatelliteIDVerifier
|
2019-03-22 20:06:57 +00:00
|
|
|
connected int32
|
2019-02-06 12:37:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewEndpoint returns a new kademlia endpoint
|
2019-09-11 21:41:43 +01:00
|
|
|
func NewEndpoint(log *zap.Logger, service *Kademlia, pingStats pingStatsSource, routingTable *RoutingTable, trust SatelliteIDVerifier) *Endpoint {
|
2019-02-06 12:37:17 +00:00
|
|
|
return &Endpoint{
|
2019-08-09 10:21:41 +01:00
|
|
|
log: log,
|
2019-03-22 20:06:57 +00:00
|
|
|
service: service,
|
2019-09-11 21:41:43 +01:00
|
|
|
pingStats: pingStats,
|
2019-03-22 20:06:57 +00:00
|
|
|
routingTable: routingTable,
|
2019-08-09 10:21:41 +01:00
|
|
|
trust: trust,
|
2019-02-06 12:37:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Query is a node to node communication query
|
2019-06-04 12:36:27 +01:00
|
|
|
func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (_ *pb.QueryResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-19 18:30:27 +00:00
|
|
|
|
2019-02-06 12:37:17 +00:00
|
|
|
if req.GetPingback() {
|
2019-03-22 20:06:57 +00:00
|
|
|
endpoint.pingback(ctx, req.Sender)
|
2019-02-06 12:37:17 +00:00
|
|
|
}
|
|
|
|
|
2019-07-19 17:46:09 +01:00
|
|
|
limit := int(req.Limit)
|
|
|
|
if limit <= 0 || limit > endpoint.routingTable.bucketSize {
|
|
|
|
limit = endpoint.routingTable.bucketSize
|
|
|
|
}
|
|
|
|
|
|
|
|
nodes, err := endpoint.routingTable.FindNear(ctx, req.Target.Id, limit)
|
2019-02-06 12:37:17 +00:00
|
|
|
if err != nil {
|
|
|
|
return &pb.QueryResponse{}, EndpointError.New("could not find near endpoint: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// pingback implements pingback for queries
|
|
|
|
func (endpoint *Endpoint) pingback(ctx context.Context, target *pb.Node) {
|
2019-06-04 12:36:27 +01:00
|
|
|
var err error
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, err = endpoint.service.Ping(ctx, *target)
|
2019-02-06 12:37:17 +00:00
|
|
|
if err != nil {
|
2019-06-18 00:37:44 +01:00
|
|
|
endpoint.log.Debug("connection to node failed", zap.Error(err), zap.Stringer("nodeID", target.Id))
|
2019-06-13 15:51:50 +01:00
|
|
|
err = endpoint.routingTable.ConnectionFailed(ctx, target)
|
2019-02-06 12:37:17 +00:00
|
|
|
if err != nil {
|
|
|
|
endpoint.log.Error("could not respond to connection failed", zap.Error(err))
|
|
|
|
}
|
|
|
|
} else {
|
2019-06-13 15:51:50 +01:00
|
|
|
err = endpoint.routingTable.ConnectionSuccess(ctx, target)
|
2019-02-06 12:37:17 +00:00
|
|
|
if err != nil {
|
|
|
|
endpoint.log.Error("could not respond to connection success", zap.Error(err))
|
|
|
|
} else {
|
|
|
|
count := atomic.AddInt32(&endpoint.connected, 1)
|
|
|
|
if count == 1 {
|
|
|
|
endpoint.log.Sugar().Debugf("Successfully connected with %s", target.Address.Address)
|
|
|
|
} else if count%100 == 0 {
|
|
|
|
endpoint.log.Sugar().Debugf("Successfully connected with %s %dx times", target.Address.Address, count)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ping provides an easy way to verify a node is online and accepting requests
|
2019-06-04 12:36:27 +01:00
|
|
|
func (endpoint *Endpoint) Ping(ctx context.Context, req *pb.PingRequest) (_ *pb.PingResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-09-11 21:41:43 +01:00
|
|
|
// NOTE: this code is very similar to that in storagenode/contact.(*Endpoint).PingNode().
|
|
|
|
// That other will be used going forward, and this will soon be gutted and deprecated. The
|
|
|
|
// code similarity will only exist until the transition away from Kademlia is complete.
|
|
|
|
p, ok := peer.FromContext(ctx)
|
|
|
|
if !ok {
|
|
|
|
return nil, status.Error(codes.Internal, "unable to get grpc peer from context")
|
|
|
|
}
|
|
|
|
peerID, err := identity.PeerIdentityFromPeer(p)
|
|
|
|
if err != nil {
|
|
|
|
return nil, status.Error(codes.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
endpoint.log.Debug("pinged", zap.Stringer("by", peerID.ID), zap.Stringer("srcAddr", p.Addr))
|
|
|
|
if endpoint.pingStats != nil {
|
|
|
|
endpoint.pingStats.WasPinged(time.Now(), peerID.ID, p.Addr.String())
|
|
|
|
} else {
|
|
|
|
endpoint.log.Debug("not updating pingStats because nil")
|
|
|
|
}
|
2019-02-06 12:37:17 +00:00
|
|
|
return &pb.PingResponse{}, nil
|
|
|
|
}
|
2019-02-25 18:41:51 +00:00
|
|
|
|
|
|
|
// RequestInfo returns the node info
|
2019-06-04 12:36:27 +01:00
|
|
|
func (endpoint *Endpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest) (_ *pb.InfoResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-02-25 18:41:51 +00:00
|
|
|
self := endpoint.service.Local()
|
|
|
|
|
2019-08-09 10:21:41 +01:00
|
|
|
if self.Type == pb.NodeType_STORAGE {
|
|
|
|
if endpoint.trust == nil {
|
2019-08-20 10:54:33 +01:00
|
|
|
return nil, status.Error(codes.Internal, "missing trust")
|
2019-08-09 10:21:41 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, status.Error(codes.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
|
|
|
if err != nil {
|
2019-08-20 10:54:33 +01:00
|
|
|
return nil, status.Errorf(codes.PermissionDenied, "untrusted peer %v", peer.ID)
|
2019-08-09 10:21:41 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-25 18:41:51 +00:00
|
|
|
return &pb.InfoResponse{
|
2019-04-22 10:07:50 +01:00
|
|
|
Type: self.Type,
|
|
|
|
Operator: &self.Operator,
|
|
|
|
Capacity: &self.Capacity,
|
|
|
|
Version: &self.Version,
|
2019-02-25 18:41:51 +00:00
|
|
|
}, nil
|
|
|
|
}
|