2018-05-16 19:47:59 +01:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package kademlia
|
|
|
|
|
|
|
|
import (
|
2018-08-13 09:39:45 +01:00
|
|
|
"encoding/binary"
|
2018-08-09 20:20:39 +01:00
|
|
|
"sync"
|
2018-05-16 19:47:59 +01:00
|
|
|
"time"
|
|
|
|
|
2018-08-09 20:20:39 +01:00
|
|
|
"github.com/zeebo/errs"
|
2018-06-05 12:48:19 +01:00
|
|
|
|
2018-06-22 14:33:57 +01:00
|
|
|
"storj.io/storj/pkg/dht"
|
2018-09-18 05:39:06 +01:00
|
|
|
"storj.io/storj/pkg/pb"
|
2018-10-26 17:54:00 +01:00
|
|
|
"storj.io/storj/pkg/storj"
|
2018-09-11 14:57:12 +01:00
|
|
|
"storj.io/storj/pkg/utils"
|
2018-08-09 20:20:39 +01:00
|
|
|
"storj.io/storj/storage"
|
2018-05-16 19:47:59 +01:00
|
|
|
)
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
const (
|
|
|
|
// KademliaBucket is the string representing the bucket used for the kademlia routing table k-bucket ids
|
|
|
|
KademliaBucket = "kbuckets"
|
|
|
|
// NodeBucket is the string representing the bucket used for the kademlia routing table node ids
|
|
|
|
NodeBucket = "nodes"
|
|
|
|
)
|
|
|
|
|
2018-08-09 20:20:39 +01:00
|
|
|
// RoutingErr is the class for all errors pertaining to routing table operations
|
|
|
|
var RoutingErr = errs.Class("routing table error")
|
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
// Bucket IDs exist in the same address space as node IDs
|
|
|
|
type bucketID [len(storj.NodeID{})]byte
|
|
|
|
|
2018-08-09 20:20:39 +01:00
|
|
|
// RoutingTable implements the RoutingTable interface
|
|
|
|
type RoutingTable struct {
|
2018-10-26 17:54:00 +01:00
|
|
|
self pb.Node
|
2018-08-17 20:11:46 +01:00
|
|
|
kadBucketDB storage.KeyValueStore
|
|
|
|
nodeBucketDB storage.KeyValueStore
|
2018-09-18 05:39:06 +01:00
|
|
|
transport *pb.NodeTransport
|
2018-08-17 20:11:46 +01:00
|
|
|
mutex *sync.Mutex
|
2018-11-29 18:39:27 +00:00
|
|
|
seen map[storj.NodeID]*pb.Node
|
|
|
|
replacementCache map[bucketID][]*pb.Node
|
2018-08-17 20:11:46 +01:00
|
|
|
bucketSize int // max number of nodes stored in a kbucket = 20 (k)
|
|
|
|
rcBucketSize int // replacementCache bucket max length
|
2018-11-20 16:54:52 +00:00
|
|
|
|
2018-06-05 12:48:19 +01:00
|
|
|
}
|
|
|
|
|
2018-08-09 20:20:39 +01:00
|
|
|
// NewRoutingTable returns a newly configured instance of a RoutingTable
|
2018-10-26 17:54:00 +01:00
|
|
|
func NewRoutingTable(localNode pb.Node, kdb, ndb storage.KeyValueStore) (*RoutingTable, error) {
|
2018-08-09 20:20:39 +01:00
|
|
|
rt := &RoutingTable{
|
2018-11-20 16:54:52 +00:00
|
|
|
self: localNode,
|
|
|
|
kadBucketDB: kdb,
|
|
|
|
nodeBucketDB: ndb,
|
|
|
|
transport: &defaultTransport,
|
|
|
|
|
2018-08-17 20:11:46 +01:00
|
|
|
mutex: &sync.Mutex{},
|
2018-11-29 18:39:27 +00:00
|
|
|
seen: make(map[storj.NodeID]*pb.Node),
|
|
|
|
replacementCache: make(map[bucketID][]*pb.Node),
|
2018-11-20 16:54:52 +00:00
|
|
|
|
|
|
|
bucketSize: *flagBucketSize,
|
|
|
|
rcBucketSize: *flagReplacementCacheSize,
|
2018-08-09 20:20:39 +01:00
|
|
|
}
|
2018-10-26 17:54:00 +01:00
|
|
|
ok, err := rt.addNode(&localNode)
|
2018-08-27 18:28:16 +01:00
|
|
|
if !ok || err != nil {
|
2018-08-09 20:20:39 +01:00
|
|
|
return nil, RoutingErr.New("could not add localNode to routing table: %s", err)
|
|
|
|
}
|
|
|
|
return rt, nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-09-11 14:57:12 +01:00
|
|
|
// Close closes underlying databases
|
|
|
|
func (rt *RoutingTable) Close() error {
|
2018-10-18 17:20:23 +01:00
|
|
|
return utils.CombineErrors(
|
|
|
|
rt.kadBucketDB.Close(),
|
|
|
|
rt.nodeBucketDB.Close(),
|
|
|
|
)
|
2018-09-11 14:57:12 +01:00
|
|
|
}
|
|
|
|
|
2018-06-22 14:33:57 +01:00
|
|
|
// Local returns the local nodes ID
|
2018-09-18 05:39:06 +01:00
|
|
|
func (rt *RoutingTable) Local() pb.Node {
|
2018-10-26 17:54:00 +01:00
|
|
|
return rt.self
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// K returns the currently configured maximum of nodes to store in a bucket
|
2018-08-09 20:20:39 +01:00
|
|
|
func (rt *RoutingTable) K() int {
|
|
|
|
return rt.bucketSize
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
// CacheSize returns the total current size of the replacement cache
|
2018-08-09 20:20:39 +01:00
|
|
|
func (rt *RoutingTable) CacheSize() int {
|
2018-08-21 19:44:42 +01:00
|
|
|
return rt.rcBucketSize
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-08-09 20:20:39 +01:00
|
|
|
// GetBucket retrieves the corresponding kbucket from node id
|
|
|
|
// Note: id doesn't need to be stored at time of search
|
2018-11-29 18:39:27 +00:00
|
|
|
func (rt *RoutingTable) GetBucket(id storj.NodeID) (bucket dht.Bucket, ok bool) {
|
|
|
|
bID, err := rt.getKBucketID(id)
|
2018-06-05 12:48:19 +01:00
|
|
|
if err != nil {
|
2018-06-22 14:33:57 +01:00
|
|
|
return &KBucket{}, false
|
2018-06-05 12:48:19 +01:00
|
|
|
}
|
2018-11-29 18:39:27 +00:00
|
|
|
if bID == (bucketID{}) {
|
2018-06-22 14:33:57 +01:00
|
|
|
return &KBucket{}, false
|
2018-06-05 12:48:19 +01:00
|
|
|
}
|
2018-11-29 18:39:27 +00:00
|
|
|
unmarshaledNodes, err := rt.getUnmarshaledNodesFromBucket(bID)
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
|
|
|
return &KBucket{}, false
|
|
|
|
}
|
|
|
|
return &KBucket{nodes: unmarshaledNodes}, true
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// GetBuckets retrieves all buckets from the local node
|
2018-08-09 20:20:39 +01:00
|
|
|
func (rt *RoutingTable) GetBuckets() (k []dht.Bucket, err error) {
|
2018-06-22 14:33:57 +01:00
|
|
|
bs := []dht.Bucket{}
|
2018-08-09 20:20:39 +01:00
|
|
|
kbuckets, err := rt.kadBucketDB.List(nil, 0)
|
|
|
|
if err != nil {
|
|
|
|
return bs, RoutingErr.New("could not get bucket ids %s", err)
|
|
|
|
}
|
|
|
|
for _, v := range kbuckets {
|
2018-11-29 18:39:27 +00:00
|
|
|
unmarshaledNodes, err := rt.getUnmarshaledNodesFromBucket(keyToBucketID(v))
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
|
|
|
return bs, err
|
|
|
|
}
|
|
|
|
bs = append(bs, &KBucket{nodes: unmarshaledNodes})
|
2018-06-05 12:48:19 +01:00
|
|
|
}
|
|
|
|
return bs, nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-11-21 17:31:27 +00:00
|
|
|
// GetBucketIds returns a storage.Keys type of bucket ID's in the Kademlia instance
|
|
|
|
func (rt *RoutingTable) GetBucketIds() (storage.Keys, error) {
|
|
|
|
kbuckets, err := rt.kadBucketDB.List(nil, 0)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return kbuckets, nil
|
|
|
|
}
|
|
|
|
|
2018-10-08 16:09:37 +01:00
|
|
|
// FindNear returns the node corresponding to the provided nodeID
|
|
|
|
// returns all Nodes closest via XOR to the provided nodeID up to the provided limit
|
|
|
|
// always returns limit + self
|
2018-11-29 18:39:27 +00:00
|
|
|
func (rt *RoutingTable) FindNear(id storj.NodeID, limit int) (nodes []*pb.Node, err error) {
|
2018-08-23 16:20:11 +01:00
|
|
|
// if id is not in the routing table
|
2018-11-29 18:39:27 +00:00
|
|
|
nodeIDsKeys, err := rt.nodeBucketDB.List(nil, 0)
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
2018-11-29 18:39:27 +00:00
|
|
|
return nodes, RoutingErr.New("could not get node ids %s", err)
|
2018-08-09 20:20:39 +01:00
|
|
|
}
|
2018-11-29 18:39:27 +00:00
|
|
|
sortByXOR(nodeIDsKeys, id.Bytes())
|
|
|
|
if len(nodeIDsKeys) >= limit {
|
|
|
|
nodeIDsKeys = nodeIDsKeys[:limit]
|
2018-08-09 20:20:39 +01:00
|
|
|
}
|
2018-11-29 18:39:27 +00:00
|
|
|
nodeIDs, err := storj.NodeIDsFromBytes(nodeIDsKeys.ByteSlices())
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
2018-11-29 18:39:27 +00:00
|
|
|
return nodes, RoutingErr.Wrap(err)
|
2018-08-09 20:20:39 +01:00
|
|
|
}
|
2018-10-08 16:09:37 +01:00
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
nodes, err = rt.getNodesFromIDsBytes(nodeIDs)
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
2018-11-29 18:39:27 +00:00
|
|
|
return nodes, RoutingErr.New("could not get nodes %s", err)
|
2018-08-09 20:20:39 +01:00
|
|
|
}
|
2018-10-08 16:09:37 +01:00
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
return nodes, nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-08-27 18:28:16 +01:00
|
|
|
// ConnectionSuccess updates or adds a node to the routing table when
|
2018-08-21 19:44:42 +01:00
|
|
|
// a successful connection is made to the node on the network
|
2018-09-18 05:39:06 +01:00
|
|
|
func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
|
2018-11-20 16:54:52 +00:00
|
|
|
// valid to connect to node without ID but don't store connection
|
2018-11-29 18:39:27 +00:00
|
|
|
if node.Id == (storj.NodeID{}) {
|
2018-11-20 16:54:52 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
rt.mutex.Lock()
|
2018-11-29 18:39:27 +00:00
|
|
|
rt.seen[node.Id] = node
|
2018-11-20 16:54:52 +00:00
|
|
|
rt.mutex.Unlock()
|
2018-11-29 18:39:27 +00:00
|
|
|
v, err := rt.nodeBucketDB.Get(storage.Key(node.Id.Bytes()))
|
2018-08-21 19:44:42 +01:00
|
|
|
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
|
|
|
return RoutingErr.New("could not get node %s", err)
|
|
|
|
}
|
2018-10-08 16:09:37 +01:00
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
if v != nil {
|
|
|
|
err = rt.updateNode(node)
|
|
|
|
if err != nil {
|
|
|
|
return RoutingErr.New("could not update node %s", err)
|
|
|
|
}
|
2018-11-20 16:54:52 +00:00
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
return nil
|
|
|
|
}
|
2018-10-08 16:09:37 +01:00
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
_, err = rt.addNode(node)
|
|
|
|
if err != nil {
|
|
|
|
return RoutingErr.New("could not add node %s", err)
|
|
|
|
}
|
2018-11-20 16:54:52 +00:00
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
return nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-08-21 19:44:42 +01:00
|
|
|
// ConnectionFailed removes a node from the routing table when
|
|
|
|
// a connection fails for the node on the network
|
2018-09-18 05:39:06 +01:00
|
|
|
func (rt *RoutingTable) ConnectionFailed(node *pb.Node) error {
|
2018-11-29 18:39:27 +00:00
|
|
|
err := rt.removeNode(node.Id)
|
2018-08-21 19:44:42 +01:00
|
|
|
if err != nil {
|
|
|
|
return RoutingErr.New("could not remove node %s", err)
|
|
|
|
}
|
|
|
|
return nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// SetBucketTimestamp updates the last updated time for a bucket
|
2018-11-29 18:39:27 +00:00
|
|
|
func (rt *RoutingTable) SetBucketTimestamp(bIDBytes []byte, now time.Time) error {
|
2018-08-09 20:20:39 +01:00
|
|
|
rt.mutex.Lock()
|
|
|
|
defer rt.mutex.Unlock()
|
2018-11-29 18:39:27 +00:00
|
|
|
err := rt.createOrUpdateKBucket(keyToBucketID(bIDBytes), now)
|
2018-06-05 12:48:19 +01:00
|
|
|
if err != nil {
|
2018-08-09 20:20:39 +01:00
|
|
|
return NodeErr.New("could not update bucket timestamp %s", err)
|
2018-06-05 12:48:19 +01:00
|
|
|
}
|
2018-05-16 19:47:59 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetBucketTimestamp retrieves the last updated time for a bucket
|
2018-11-29 18:39:27 +00:00
|
|
|
func (rt *RoutingTable) GetBucketTimestamp(bIDBytes []byte, bucket dht.Bucket) (time.Time, error) {
|
|
|
|
t, err := rt.kadBucketDB.Get(bIDBytes)
|
2018-08-09 20:20:39 +01:00
|
|
|
if err != nil {
|
|
|
|
return time.Now(), RoutingErr.New("could not get bucket timestamp %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
timestamp, _ := binary.Varint(t)
|
|
|
|
|
|
|
|
return time.Unix(0, timestamp).UTC(), nil
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
2018-10-16 16:22:31 +01:00
|
|
|
|
|
|
|
func (rt *RoutingTable) iterate(opts storage.IterateOptions, f func(it storage.Iterator) error) error {
|
|
|
|
return rt.nodeBucketDB.Iterate(opts, f)
|
|
|
|
}
|