2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-05-09 15:04:51 +01:00
// See LICENSE for copying information.
package kademlia
import (
"context"
2018-12-20 21:45:06 +00:00
"math/rand"
2019-02-08 20:35:59 +00:00
"sync/atomic"
2018-12-20 21:45:06 +00:00
"time"
2018-05-09 15:04:51 +01:00
2018-06-22 14:33:57 +01:00
"github.com/zeebo/errs"
2018-10-10 16:05:05 +01:00
"go.uber.org/zap"
2019-09-11 21:41:43 +01:00
"gopkg.in/spacemonkeygo/monkit.v2"
2018-11-21 15:07:18 +00:00
2019-01-25 22:33:20 +00:00
"storj.io/storj/internal/sync2"
2019-01-30 20:47:21 +00:00
"storj.io/storj/pkg/identity"
2019-07-10 15:36:37 +01:00
"storj.io/storj/pkg/kademlia/kademliaclient"
2018-09-18 05:39:06 +01:00
"storj.io/storj/pkg/pb"
2018-11-29 18:39:27 +00:00
"storj.io/storj/pkg/storj"
2019-01-18 15:00:56 +00:00
"storj.io/storj/pkg/transport"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2018-10-16 16:22:31 +01:00
"storj.io/storj/storage"
2018-10-08 16:09:37 +01:00
)
2018-11-01 17:03:46 +00:00
var (
// NodeErr is the class for all errors pertaining to node operations
NodeErr = errs . Class ( "node error" )
// BootstrapErr is the class for all errors pertaining to bootstrapping a node
BootstrapErr = errs . Class ( "bootstrap node error" )
// NodeNotFound is returned when a lookup can not produce the requested node
2018-12-04 21:39:28 +00:00
NodeNotFound = errs . Class ( "node not found" )
2018-11-01 17:03:46 +00:00
// TODO: shouldn't default to TCP but not sure what to do yet
defaultTransport = pb . NodeTransport_TCP_TLS_GRPC
2019-06-04 12:36:27 +01:00
mon = monkit . Package ( )
2018-11-01 17:03:46 +00:00
)
2018-10-08 16:09:37 +01:00
2019-07-19 18:34:00 +01:00
// Kademlia is an implementation of kademlia network.
2018-06-22 14:33:57 +01:00
type Kademlia struct {
2019-01-25 22:33:20 +00:00
log * zap . Logger
alpha int // alpha is a system wide concurrency parameter
routingTable * RoutingTable
bootstrapNodes [ ] pb . Node
2019-07-10 15:36:37 +01:00
dialer * kademliaclient . Dialer
2019-01-28 19:04:42 +00:00
lookups sync2 . WorkGroup
2019-01-25 22:33:20 +00:00
2019-04-26 13:42:09 +01:00
bootstrapFinished sync2 . Fence
bootstrapBackoffMax time . Duration
bootstrapBackoffBase time . Duration
2019-02-08 09:25:13 +00:00
2019-02-08 20:35:59 +00:00
refreshThreshold int64
RefreshBuckets sync2 . Cycle
2018-06-22 14:33:57 +01:00
}
2019-01-29 06:51:07 +00:00
// NewService returns a newly configured Kademlia instance
2019-04-22 10:07:50 +01:00
func NewService ( log * zap . Logger , transport transport . Client , rt * RoutingTable , config Config ) ( * Kademlia , error ) {
2018-10-08 16:09:37 +01:00
k := & Kademlia {
2019-04-26 13:42:09 +01:00
log : log ,
alpha : config . Alpha ,
routingTable : rt ,
bootstrapNodes : config . BootstrapNodes ( ) ,
bootstrapBackoffMax : config . BootstrapBackoffMax ,
bootstrapBackoffBase : config . BootstrapBackoffBase ,
2019-07-10 15:36:37 +01:00
dialer : kademliaclient . NewDialer ( log . Named ( "dialer" ) , transport ) ,
2019-04-26 13:42:09 +01:00
refreshThreshold : int64 ( time . Minute ) ,
2018-06-22 14:33:57 +01:00
}
2019-01-25 22:33:20 +00:00
2018-10-08 16:09:37 +01:00
return k , nil
2018-05-09 15:04:51 +01:00
}
2019-01-10 13:13:27 +00:00
// Close closes all kademlia connections and prevents new ones from being created.
func ( k * Kademlia ) Close ( ) error {
2019-01-25 22:33:20 +00:00
dialerErr := k . dialer . Close ( )
2019-01-28 19:04:42 +00:00
k . lookups . Close ( )
2019-01-25 22:33:20 +00:00
k . lookups . Wait ( )
return dialerErr
2019-01-10 13:13:27 +00:00
}
2019-01-28 22:53:37 +00:00
// FindNear returns all nodes from a starting node up to a maximum limit
2019-04-22 10:07:50 +01:00
// stored in the local routing table.
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) FindNear ( ctx context . Context , start storj . NodeID , limit int ) ( _ [ ] * pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-13 15:51:50 +01:00
return k . routingTable . FindNear ( ctx , start , limit )
2018-06-22 14:33:57 +01:00
}
2019-02-16 03:23:35 +00:00
// GetBucketIds returns a storage.Keys type of bucket ID's in the Kademlia instance
2019-06-13 15:51:50 +01:00
func ( k * Kademlia ) GetBucketIds ( ctx context . Context ) ( _ storage . Keys , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return k . routingTable . GetBucketIds ( ctx )
2019-02-16 03:23:35 +00:00
}
2019-04-22 10:07:50 +01:00
// Local returns the local node
func ( k * Kademlia ) Local ( ) overlay . NodeDossier {
2019-02-16 03:23:35 +00:00
return k . routingTable . Local ( )
}
2018-06-22 14:33:57 +01:00
2019-01-10 13:13:27 +00:00
// SetBootstrapNodes sets the bootstrap nodes.
// Must be called before anything starting to use kademlia.
func ( k * Kademlia ) SetBootstrapNodes ( nodes [ ] pb . Node ) { k . bootstrapNodes = nodes }
2019-01-30 19:49:25 +00:00
// GetBootstrapNodes gets the bootstrap nodes.
func ( k * Kademlia ) GetBootstrapNodes ( ) [ ] pb . Node { return k . bootstrapNodes }
2019-02-28 19:55:27 +00:00
// DumpNodes returns all the nodes in the node database
2019-06-13 15:51:50 +01:00
func ( k * Kademlia ) DumpNodes ( ctx context . Context ) ( _ [ ] * pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return k . routingTable . DumpNodes ( ctx )
2019-02-28 19:55:27 +00:00
}
2018-06-22 14:33:57 +01:00
// Bootstrap contacts one of a set of pre defined trusted nodes on the network and
// begins populating the local Kademlia node
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) Bootstrap ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-25 22:33:20 +00:00
defer k . bootstrapFinished . Release ( )
2019-08-02 14:54:10 +01:00
defer k . timeTrack ( time . Now ( ) , "Bootstrap" )
2019-01-25 22:33:20 +00:00
2019-01-28 19:04:42 +00:00
if ! k . lookups . Start ( ) {
return context . Canceled
}
2019-01-25 22:33:20 +00:00
defer k . lookups . Done ( )
2018-10-08 16:09:37 +01:00
if len ( k . bootstrapNodes ) == 0 {
2019-08-02 14:54:10 +01:00
k . log . Warn ( "No bootsrap address specified" , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) )
2019-01-25 22:33:20 +00:00
return nil
2018-06-22 14:33:57 +01:00
}
2019-01-23 15:48:46 +00:00
2019-04-26 13:42:09 +01:00
waitInterval := k . bootstrapBackoffBase
2019-03-20 08:30:42 +00:00
var errGroup errs . Group
2019-04-26 13:42:09 +01:00
for i := 0 ; waitInterval < k . bootstrapBackoffMax ; i ++ {
if i > 0 {
time . Sleep ( waitInterval )
2019-05-29 14:14:25 +01:00
waitInterval *= 2
2019-01-25 22:33:20 +00:00
}
2019-04-26 13:42:09 +01:00
var foundOnlineBootstrap bool
for i , node := range k . bootstrapNodes {
if ctx . Err ( ) != nil {
2019-08-02 14:54:10 +01:00
k . log . Debug ( "Context Error received while Boostraping " , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) , zap . Stringer ( "Bootstrap Node" , node . Id ) )
2019-04-26 13:42:09 +01:00
errGroup . Add ( ctx . Err ( ) )
return errGroup . Err ( )
}
ident , err := k . dialer . FetchPeerIdentityUnverified ( ctx , node . Address . Address )
if err != nil {
2019-08-02 14:54:10 +01:00
errGroup . Add ( BootstrapErr . Wrap ( BootstrapErr . New ( "%s : %s unable to fetch unverified peer identity node address %s: %s" , k . routingTable . self . Type . String ( ) , k . routingTable . self . Id . String ( ) , node . Address . Address , err ) ) )
2019-04-26 13:42:09 +01:00
continue
}
2019-06-26 14:16:46 +01:00
// FetchPeerIdentityUnverified uses transport.DialAddress, which should be
// enough to have the TransportObservers find out about this node. Unfortunately,
// getting DialAddress to be able to grab the node id seems challenging with gRPC.
// The way FetchPeerIdentityUnverified does is is to do a basic ping request, which
// we have now done. Let's tell all the transport observers now.
// TODO: remove the explicit transport observer notification
2019-07-10 15:36:37 +01:00
k . dialer . AlertSuccess ( ctx , & pb . Node {
2019-06-26 14:16:46 +01:00
Id : ident . ID ,
Address : node . Address ,
} )
2019-04-26 13:42:09 +01:00
k . routingTable . mutex . Lock ( )
node . Id = ident . ID
k . bootstrapNodes [ i ] = node
k . routingTable . mutex . Unlock ( )
foundOnlineBootstrap = true
}
if ! foundOnlineBootstrap {
2019-08-02 14:54:10 +01:00
errGroup . Add ( BootstrapErr . New ( "%s : %s found no bootstrap node found online" , k . routingTable . self . Type . String ( ) , k . routingTable . self . Id . String ( ) ) )
2019-03-20 08:30:42 +00:00
continue
2019-03-04 20:03:33 +00:00
}
2019-04-26 13:42:09 +01:00
//find nodes most similar to self
2019-03-04 20:03:33 +00:00
k . routingTable . mutex . Lock ( )
2019-04-26 13:42:09 +01:00
id := k . routingTable . self . Id
2019-03-04 20:03:33 +00:00
k . routingTable . mutex . Unlock ( )
2019-06-26 14:16:46 +01:00
_ , err := k . lookup ( ctx , id )
2019-03-20 08:30:42 +00:00
if err != nil {
2019-07-22 20:10:04 +01:00
errGroup . Add ( BootstrapErr . Wrap ( err ) )
2019-04-26 13:42:09 +01:00
continue
2019-03-20 08:30:42 +00:00
}
2019-04-26 13:42:09 +01:00
return nil
// TODO(dylan): We do not currently handle this last bit of behavior.
// ```
// Finally, u refreshes all k-buckets further away than its closest neighbor.
// During the refreshes, u both populates its own k-buckets and inserts
// itself into other nodes' k-buckets as necessary.
// ```
2019-01-23 15:48:46 +00:00
}
2019-08-02 14:54:10 +01:00
errGroup . Add ( BootstrapErr . New ( "%s : %s unable to start bootstrap after final wait time of %s" , k . routingTable . self . Type . String ( ) , k . routingTable . self . Id . String ( ) , waitInterval ) )
2019-04-26 13:42:09 +01:00
return errGroup . Err ( )
2018-06-22 14:33:57 +01:00
}
2019-01-25 22:33:20 +00:00
// WaitForBootstrap waits for bootstrap pinging has been completed.
func ( k * Kademlia ) WaitForBootstrap ( ) {
k . bootstrapFinished . Wait ( )
}
2019-02-05 17:57:56 +00:00
// FetchPeerIdentity connects to a node and returns its peer identity
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) FetchPeerIdentity ( ctx context . Context , nodeID storj . NodeID ) ( _ * identity . PeerIdentity , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-02-05 17:57:56 +00:00
if ! k . lookups . Start ( ) {
return nil , context . Canceled
}
defer k . lookups . Done ( )
node , err := k . FindNode ( ctx , nodeID )
if err != nil {
return nil , err
}
return k . dialer . FetchPeerIdentity ( ctx , node )
}
2018-10-08 16:09:37 +01:00
// Ping checks that the provided node is still accessible on the network
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) Ping ( ctx context . Context , node pb . Node ) ( _ pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-28 19:04:42 +00:00
if ! k . lookups . Start ( ) {
return pb . Node { } , context . Canceled
}
2019-01-25 22:33:20 +00:00
defer k . lookups . Done ( )
2019-03-04 20:03:33 +00:00
ok , err := k . dialer . PingNode ( ctx , node )
2018-10-26 17:38:22 +01:00
if err != nil {
return pb . Node { } , NodeErr . Wrap ( err )
}
if ! ok {
2019-08-02 14:54:10 +01:00
return pb . Node { } , NodeErr . New ( "%s : %s failed to ping node ID %s" , k . routingTable . self . Type . String ( ) , k . routingTable . self . Id . String ( ) , node . Id . String ( ) )
2018-10-26 17:38:22 +01:00
}
return node , nil
2018-06-22 14:33:57 +01:00
}
2019-02-25 18:41:51 +00:00
// FetchInfo connects to a node address and returns the node info
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) FetchInfo ( ctx context . Context , node pb . Node ) ( _ * pb . InfoResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-02-25 18:41:51 +00:00
if ! k . lookups . Start ( ) {
2019-03-02 07:34:08 +00:00
return nil , context . Canceled
2019-02-25 18:41:51 +00:00
}
defer k . lookups . Done ( )
2019-03-02 07:34:08 +00:00
info , err := k . dialer . FetchInfo ( ctx , node )
2019-02-25 18:41:51 +00:00
if err != nil {
2019-08-09 10:21:41 +01:00
return nil , NodeErr . Wrap ( err )
2019-02-25 18:41:51 +00:00
}
2019-03-02 07:34:08 +00:00
return info , nil
2019-02-25 18:41:51 +00:00
}
2018-10-08 16:09:37 +01:00
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
// begins searching the network for the NodeID. Returns and error if node was not found
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) FindNode ( ctx context . Context , nodeID storj . NodeID ) ( _ pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-28 19:04:42 +00:00
if ! k . lookups . Start ( ) {
return pb . Node { } , context . Canceled
}
2019-01-25 22:33:20 +00:00
defer k . lookups . Done ( )
2019-06-26 14:16:46 +01:00
results , err := k . lookup ( ctx , nodeID )
if err != nil {
return pb . Node { } , err
}
if len ( results ) < 1 {
2019-08-02 14:54:10 +01:00
return pb . Node { } , NodeNotFound . Wrap ( NodeNotFound . New ( "%s : %s couldn't find node ID %s: %s" , k . routingTable . self . Type . String ( ) , k . routingTable . self . Id . String ( ) , nodeID . String ( ) , err ) )
2019-06-26 14:16:46 +01:00
}
return * results [ 0 ] , nil
2018-12-20 21:45:06 +00:00
}
//lookup initiates a kadmelia node lookup
2019-06-26 14:16:46 +01:00
func ( k * Kademlia ) lookup ( ctx context . Context , nodeID storj . NodeID ) ( _ [ ] * pb . Node , err error ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-28 19:04:42 +00:00
if ! k . lookups . Start ( ) {
2019-06-26 14:16:46 +01:00
return nil , context . Canceled
2019-01-28 19:04:42 +00:00
}
2019-01-25 22:33:20 +00:00
defer k . lookups . Done ( )
2019-06-26 14:16:46 +01:00
nodes , err := k . routingTable . FindNear ( ctx , nodeID , k . routingTable . K ( ) )
if err != nil {
return nil , err
2018-12-04 21:39:28 +00:00
}
2019-06-26 16:30:37 +01:00
2019-06-26 14:16:46 +01:00
self := k . routingTable . Local ( ) . Node
lookup := newPeerDiscovery ( k . log , k . dialer , nodeID , nodes , k . routingTable . K ( ) , k . alpha , & self )
results , err := lookup . Run ( ctx )
2018-11-20 16:54:52 +00:00
if err != nil {
2019-06-26 14:16:46 +01:00
return nil , err
2018-11-20 16:54:52 +00:00
}
2019-06-05 15:23:10 +01:00
bucket , err := k . routingTable . getKBucketID ( ctx , nodeID )
2018-12-20 21:45:06 +00:00
if err != nil {
2019-08-02 14:54:10 +01:00
k . log . Warn ( "Error getting getKBucketID in kad lookup" , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) )
2018-12-20 21:45:06 +00:00
} else {
2019-06-13 15:51:50 +01:00
err = k . routingTable . SetBucketTimestamp ( ctx , bucket [ : ] , time . Now ( ) )
2018-12-20 21:45:06 +00:00
if err != nil {
2019-08-02 14:54:10 +01:00
k . log . Warn ( "Error updating bucket timestamp in kad lookup" , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) )
2018-12-20 21:45:06 +00:00
}
}
2019-06-26 14:16:46 +01:00
return results , nil
2018-06-22 14:33:57 +01:00
}
2019-04-22 09:34:11 +01:00
// GetNodesWithinKBucket returns all the routing nodes in the specified k-bucket
2019-06-13 15:51:50 +01:00
func ( k * Kademlia ) GetNodesWithinKBucket ( ctx context . Context , bID bucketID ) ( _ [ ] * pb . Node , err error ) {
return k . routingTable . getUnmarshaledNodesFromBucket ( ctx , bID )
2019-04-22 09:34:11 +01:00
}
// GetCachedNodesWithinKBucket returns all the cached nodes in the specified k-bucket
func ( k * Kademlia ) GetCachedNodesWithinKBucket ( bID bucketID ) [ ] * pb . Node {
return k . routingTable . replacementCache [ bID ]
}
2019-02-08 20:35:59 +00:00
// SetBucketRefreshThreshold changes the threshold when buckets are considered stale and need refreshing.
func ( k * Kademlia ) SetBucketRefreshThreshold ( threshold time . Duration ) {
atomic . StoreInt64 ( & k . refreshThreshold , int64 ( threshold ) )
}
2019-02-08 09:25:13 +00:00
// Run occasionally refreshes stale kad buckets
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) Run ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-02 14:54:10 +01:00
defer k . timeTrack ( time . Now ( ) , "Kad Refresh" )
2019-06-04 12:36:27 +01:00
2019-01-28 19:04:42 +00:00
if ! k . lookups . Start ( ) {
return context . Canceled
}
2019-01-25 22:33:20 +00:00
defer k . lookups . Done ( )
2019-02-08 09:25:13 +00:00
k . RefreshBuckets . SetInterval ( 5 * time . Minute )
return k . RefreshBuckets . Run ( ctx , func ( ctx context . Context ) error {
2019-02-08 20:35:59 +00:00
threshold := time . Duration ( atomic . LoadInt64 ( & k . refreshThreshold ) )
err := k . refresh ( ctx , threshold )
2019-02-08 09:25:13 +00:00
if err != nil {
2019-08-02 14:54:10 +01:00
k . log . Warn ( "bucket refresh failed" , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) , zap . Error ( err ) )
2019-01-18 13:54:08 +00:00
}
2019-02-08 09:25:13 +00:00
return nil
} )
2019-01-18 13:54:08 +00:00
}
2019-01-10 13:13:27 +00:00
// refresh updates each Kademlia bucket not contacted in the last hour
2019-06-04 12:36:27 +01:00
func ( k * Kademlia ) refresh ( ctx context . Context , threshold time . Duration ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-13 15:51:50 +01:00
bIDs , err := k . routingTable . GetBucketIds ( ctx )
2018-12-20 21:45:06 +00:00
if err != nil {
return Error . Wrap ( err )
}
now := time . Now ( )
startID := bucketID { }
var errors errs . Group
for _ , bID := range bIDs {
2019-02-14 15:47:03 +00:00
endID := keyToBucketID ( bID )
2019-06-13 15:51:50 +01:00
ts , tErr := k . routingTable . GetBucketTimestamp ( ctx , bID )
2018-12-20 21:45:06 +00:00
if tErr != nil {
errors . Add ( tErr )
2019-01-28 19:36:20 +00:00
} else if now . After ( ts . Add ( threshold ) ) {
2019-02-14 15:47:03 +00:00
rID , _ := randomIDInRange ( startID , endID )
2018-12-20 21:45:06 +00:00
_ , _ = k . FindNode ( ctx , rID ) // ignore node not found
}
2019-02-14 15:47:03 +00:00
startID = endID
2018-12-20 21:45:06 +00:00
}
return Error . Wrap ( errors . Err ( ) )
}
// randomIDInRange finds a random node ID with a range (start..end]
func randomIDInRange ( start , end bucketID ) ( storj . NodeID , error ) {
randID := storj . NodeID { }
divergedHigh := false
divergedLow := false
for x := 0 ; x < len ( randID ) ; x ++ {
s := byte ( 0 )
if ! divergedLow {
s = start [ x ]
}
e := byte ( 255 )
if ! divergedHigh {
e = end [ x ]
}
if s > e {
return storj . NodeID { } , errs . New ( "Random id range was invalid" )
}
if s == e {
randID [ x ] = s
} else {
r := s + byte ( rand . Intn ( int ( e - s ) ) ) + 1
if r < e {
divergedHigh = true
}
if r > s {
divergedLow = true
}
randID [ x ] = r
}
}
if ! divergedLow {
if ! divergedHigh { // start == end
return storj . NodeID { } , errs . New ( "Random id range was invalid" )
} else if randID [ len ( randID ) - 1 ] == start [ len ( randID ) - 1 ] { // start == randID
randID [ len ( randID ) - 1 ] = start [ len ( randID ) - 1 ] + 1
}
}
return randID , nil
}
2019-08-02 14:54:10 +01:00
// timeTrack tracks how long a function ran for
func ( k * Kademlia ) timeTrack ( start time . Time , name string ) {
elapsed := time . Since ( start )
k . log . Debug ( "" , zap . Stringer ( k . routingTable . self . Type . String ( ) , k . routingTable . self . Id ) , zap . Duration ( name , elapsed ) )
}