storj/pkg/kademlia/routing_helpers.go
JT Olio f1641af802 storage: add monkit task to missing places (#2122)
* storage: add monkit task to missing places

Change-Id: I9e17a6b14f7c25bbf698eeecf32785e9add3f26e

* fix tests

Change-Id: Id078276fa3de61a28eb3d01d4e751732ecbb173f

* import order

Change-Id: I814e33755b9f10b5219af37cd828cd75eb3da1a4

* remove part of other commit

Change-Id: Idaa4c95cd65e97567fb466de49718db8203cfbe1
2019-06-05 16:23:10 +02:00

367 lines
11 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"encoding/binary"
"time"
"github.com/gogo/protobuf/proto"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
// addNode attempts to add a new contact to the routing table
// Requires node not already in table
// Returns true if node was added successfully
func (rt *RoutingTable) addNode(node *pb.Node) (_ bool, err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
rt.mutex.Lock()
defer rt.mutex.Unlock()
if node.Id == rt.self.Id {
err := rt.createOrUpdateKBucket(ctx, firstBucketID, time.Now())
if err != nil {
return false, RoutingErr.New("could not create initial K bucket: %s", err)
}
err = rt.putNode(node)
if err != nil {
return false, RoutingErr.New("could not add initial node to nodeBucketDB: %s", err)
}
return true, nil
}
kadBucketID, err := rt.getKBucketID(ctx, node.Id)
if err != nil {
return false, RoutingErr.New("could not getKBucketID: %s", err)
}
hasRoom, err := rt.kadBucketHasRoom(kadBucketID)
if err != nil {
return false, err
}
containsLocal, err := rt.kadBucketContainsLocalNode(kadBucketID)
if err != nil {
return false, err
}
withinK, err := rt.wouldBeInNearestK(node.Id)
if err != nil {
return false, RoutingErr.New("could not determine if node is within k: %s", err)
}
for !hasRoom {
if containsLocal || withinK {
depth, err := rt.determineLeafDepth(kadBucketID)
if err != nil {
return false, RoutingErr.New("could not determine leaf depth: %s", err)
}
kadBucketID = rt.splitBucket(kadBucketID, depth)
err = rt.createOrUpdateKBucket(ctx, kadBucketID, time.Now())
if err != nil {
return false, RoutingErr.New("could not split and create K bucket: %s", err)
}
kadBucketID, err = rt.getKBucketID(ctx, node.Id)
if err != nil {
return false, RoutingErr.New("could not get k bucket Id within add node split bucket checks: %s", err)
}
hasRoom, err = rt.kadBucketHasRoom(kadBucketID)
if err != nil {
return false, err
}
containsLocal, err = rt.kadBucketContainsLocalNode(kadBucketID)
if err != nil {
return false, err
}
} else {
rt.addToReplacementCache(kadBucketID, node)
return false, nil
}
}
err = rt.putNode(node)
if err != nil {
return false, RoutingErr.New("could not add node to nodeBucketDB: %s", err)
}
err = rt.createOrUpdateKBucket(ctx, kadBucketID, time.Now())
if err != nil {
return false, RoutingErr.New("could not create or update K bucket: %s", err)
}
return true, nil
}
// updateNode will update the node information given that
// the node is already in the routing table.
func (rt *RoutingTable) updateNode(node *pb.Node) (err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
if err := rt.putNode(node); err != nil {
return RoutingErr.New("could not update node: %v", err)
}
return nil
}
// removeNode will remove churned nodes and replace those entries with nodes from the replacement cache.
func (rt *RoutingTable) removeNode(node *pb.Node) (err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
rt.mutex.Lock()
defer rt.mutex.Unlock()
kadBucketID, err := rt.getKBucketID(ctx, node.Id)
if err != nil {
return RoutingErr.New("could not get k bucket %s", err)
}
existingMarshalled, err := rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
if storage.ErrKeyNotFound.Has(err) {
//check replacement cache
rt.removeFromReplacementCache(kadBucketID, node)
return nil
} else if err != nil {
return RoutingErr.New("could not get node %s", err)
}
var existing pb.Node
err = proto.Unmarshal(existingMarshalled, &existing)
if err != nil {
return RoutingErr.New("could not unmarshal node %s", err)
}
if !pb.AddressEqual(existing.Address, node.Address) {
// don't remove a node if the address is different
return nil
}
err = rt.nodeBucketDB.Delete(ctx, node.Id.Bytes())
if err != nil {
return RoutingErr.New("could not delete node %s", err)
}
nodes := rt.replacementCache[kadBucketID]
if len(nodes) == 0 {
return nil
}
err = rt.putNode(nodes[len(nodes)-1])
if err != nil {
return err
}
rt.replacementCache[kadBucketID] = nodes[:len(nodes)-1]
return nil
}
// putNode: helper, adds or updates Node and ID to nodeBucketDB
func (rt *RoutingTable) putNode(node *pb.Node) (err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
v, err := proto.Marshal(node)
if err != nil {
return RoutingErr.Wrap(err)
}
err = rt.nodeBucketDB.Put(ctx, node.Id.Bytes(), v)
if err != nil {
return RoutingErr.New("could not add key value pair to nodeBucketDB: %s", err)
}
return nil
}
// createOrUpdateKBucket: helper, adds or updates given kbucket
func (rt *RoutingTable) createOrUpdateKBucket(ctx context.Context, bID bucketID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
dateTime := make([]byte, binary.MaxVarintLen64)
binary.PutVarint(dateTime, now.UnixNano())
err = rt.kadBucketDB.Put(ctx, bID[:], dateTime)
if err != nil {
return RoutingErr.New("could not add or update k bucket: %s", err)
}
return nil
}
// getKBucketID: helper, returns the id of the corresponding k bucket given a node id.
// The node doesn't have to be in the routing table at time of search
func (rt *RoutingTable) getKBucketID(ctx context.Context, nodeID storj.NodeID) (_ bucketID, err error) {
defer mon.Task()(&ctx)(&err)
match := bucketID{}
err = rt.kadBucketDB.Iterate(ctx, storage.IterateOptions{First: storage.Key{}, Recurse: true},
func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
match = keyToBucketID(item.Key)
if nodeID.Less(match) {
break
}
}
return nil
},
)
if err != nil {
return bucketID{}, RoutingErr.Wrap(err)
}
return match, nil
}
// wouldBeInNearestK: helper, returns true if the node in question is within the nearest k from local node
func (rt *RoutingTable) wouldBeInNearestK(nodeID storj.NodeID) (bool, error) {
closestNodes, err := rt.FindNear(rt.self.Id, rt.bucketSize)
if err != nil {
return false, RoutingErr.Wrap(err)
}
if len(closestNodes) < rt.bucketSize {
return true, nil
}
var furthestIDWithinK storj.NodeID
if len(closestNodes) <= rt.bucketSize {
furthestIDWithinK = closestNodes[len(closestNodes)-1].Id
} else {
furthestIDWithinK = closestNodes[rt.bucketSize].Id
}
existingXor := xorNodeID(furthestIDWithinK, rt.self.Id)
newXor := xorNodeID(nodeID, rt.self.Id)
return newXor.Less(existingXor), nil
}
// kadBucketContainsLocalNode returns true if the kbucket in question contains the local node
func (rt *RoutingTable) kadBucketContainsLocalNode(queryID bucketID) (_ bool, err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
bID, err := rt.getKBucketID(ctx, rt.self.Id)
if err != nil {
return false, err
}
return queryID == bID, nil
}
// kadBucketHasRoom: helper, returns true if it has fewer than k nodes
func (rt *RoutingTable) kadBucketHasRoom(bID bucketID) (bool, error) {
nodes, err := rt.getNodeIDsWithinKBucket(bID)
if err != nil {
return false, err
}
if len(nodes) < rt.bucketSize {
return true, nil
}
return false, nil
}
// getNodeIDsWithinKBucket: helper, returns a collection of all the node ids contained within the kbucket
func (rt *RoutingTable) getNodeIDsWithinKBucket(bID bucketID) (_ storj.NodeIDList, err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
endpoints, err := rt.getKBucketRange(bID)
if err != nil {
return nil, err
}
left := endpoints[0]
right := endpoints[1]
var ids []storj.NodeID
err = rt.iterateNodes(ctx, left, func(ctx context.Context, nodeID storj.NodeID, protoNode []byte) error {
if left.Less(nodeID) && (nodeID.Less(right) || nodeID == right) {
ids = append(ids, nodeID)
}
return nil
}, false)
if err != nil {
return nil, err
}
return ids, nil
}
// getNodesFromIDsBytes: helper, returns array of encoded nodes from node ids
func (rt *RoutingTable) getNodesFromIDsBytes(nodeIDs storj.NodeIDList) (_ []*pb.Node, err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
var marshaledNodes []storage.Value
for _, v := range nodeIDs {
n, err := rt.nodeBucketDB.Get(ctx, v.Bytes())
if err != nil {
return nil, RoutingErr.New("could not get node id %v, %s", v, err)
}
marshaledNodes = append(marshaledNodes, n)
}
return unmarshalNodes(marshaledNodes)
}
// unmarshalNodes: helper, returns slice of reconstructed node pointers given a map of nodeIDs:serialized nodes
func unmarshalNodes(nodes []storage.Value) ([]*pb.Node, error) {
var unmarshaled []*pb.Node
for _, n := range nodes {
node := &pb.Node{}
err := proto.Unmarshal(n, node)
if err != nil {
return unmarshaled, RoutingErr.New("could not unmarshal node %s", err)
}
unmarshaled = append(unmarshaled, node)
}
return unmarshaled, nil
}
// getUnmarshaledNodesFromBucket: helper, gets nodes within kbucket
func (rt *RoutingTable) getUnmarshaledNodesFromBucket(bID bucketID) ([]*pb.Node, error) {
nodeIDsBytes, err := rt.getNodeIDsWithinKBucket(bID)
if err != nil {
return []*pb.Node{}, RoutingErr.New("could not get nodeIds within kbucket %s", err)
}
nodes, err := rt.getNodesFromIDsBytes(nodeIDsBytes)
if err != nil {
return []*pb.Node{}, RoutingErr.New("could not get node values %s", err)
}
return nodes, nil
}
// getKBucketRange: helper, returns the left and right endpoints of the range of node ids contained within the bucket
func (rt *RoutingTable) getKBucketRange(bID bucketID) (_ []bucketID, err error) {
ctx := context.TODO()
defer mon.Task()(&ctx)(&err)
previousBucket := bucketID{}
endpoints := []bucketID{}
err = rt.kadBucketDB.Iterate(ctx, storage.IterateOptions{First: storage.Key{}, Recurse: true},
func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
thisBucket := keyToBucketID(item.Key)
if thisBucket == bID {
endpoints = []bucketID{previousBucket, bID}
break
}
previousBucket = thisBucket
}
return nil
},
)
if err != nil {
return endpoints, RoutingErr.Wrap(err)
}
return endpoints, nil
}
// determineLeafDepth determines the level of the bucket id in question.
// Eg level 0 means there is only 1 bucket, level 1 means the bucket has been split once, and so on
func (rt *RoutingTable) determineLeafDepth(bID bucketID) (int, error) {
bucketRange, err := rt.getKBucketRange(bID)
if err != nil {
return -1, RoutingErr.New("could not get k bucket range %s", err)
}
smaller := bucketRange[0]
diffBit, err := determineDifferingBitIndex(bID, smaller)
if err != nil {
return diffBit + 1, RoutingErr.New("could not determine differing bit %s", err)
}
return diffBit + 1, nil
}
// splitBucket: helper, returns the smaller of the two new bucket ids
// the original bucket id becomes the greater of the 2 new
func (rt *RoutingTable) splitBucket(bID bucketID, depth int) bucketID {
var newID bucketID
copy(newID[:], bID[:])
byteIndex := depth / 8
bitInByteIndex := 7 - (depth % 8)
toggle := byte(1 << uint(bitInByteIndex))
newID[byteIndex] ^= toggle
return newID
}