Kademlia Routing Table (#164)

* adds comment

* runs deps

* creates boltdb kademlia routing table

* protobuf updates

* adds reverselist to mockkeyvaluestore interface

* xor wip

* xor wip

* fixes xor sort

* runs go fmt

* fixes

* goimports again

* trying to fix travis tests

* fixes mock tests
This commit is contained in:
Jennifer Li Johnson 2018-07-30 15:25:18 -04:00 committed by GitHub
parent 7e136db9cf
commit 3230762041
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1123 additions and 30 deletions

View File

@ -32,13 +32,14 @@ func (k *KvStore) Empty() bool {
// MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go)
type MockKeyValueStore struct {
Data KvStore
GetCalled int
PutCalled int
ListCalled int
DeleteCalled int
CloseCalled int
PingCalled int
Data KvStore
GetCalled int
PutCalled int
ListCalled int
ReverseListCalled int
DeleteCalled int
CloseCalled int
PingCalled int
}
// RedisDone is a function type that describes the callback returned by `EnsureRedis`
@ -118,6 +119,12 @@ func (m *MockKeyValueStore) List(startingKey storage.Key, limit storage.Limit) (
return keys, nil
}
// ReverseList returns either a list of keys for which the MockKeyValueStore has values or an error.
func (m *MockKeyValueStore) ReverseList(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
//TODO - JJ
return storage.Keys{}, nil
}
func mapIntoSlice(data KvStore) []string {
keySlice := make([]string, len(data))
i := 0
@ -144,13 +151,14 @@ func (m *MockKeyValueStore) Ping() error {
// NewMockKeyValueStore returns a mocked `KeyValueStore` implementation for testing
func NewMockKeyValueStore(d KvStore) *MockKeyValueStore {
return &MockKeyValueStore{
Data: d,
GetCalled: 0,
PutCalled: 0,
ListCalled: 0,
DeleteCalled: 0,
CloseCalled: 0,
PingCalled: 0,
Data: d,
GetCalled: 0,
PutCalled: 0,
ListCalled: 0,
ReverseListCalled: 0,
DeleteCalled: 0,
CloseCalled: 0,
PingCalled: 0,
}
}

View File

@ -0,0 +1,435 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"bytes"
"math/rand"
"sync"
"time"
pb "github.com/golang/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
proto "storj.io/storj/protos/overlay"
"storj.io/storj/storage"
"storj.io/storj/storage/boltdb"
)
// RoutingErr is the class for all errors pertaining to routing table operations
var RoutingErr = errs.Class("routing table error")
// RoutingTable implements the RoutingTable interface
type RoutingTable struct {
self *proto.Node
kadBucketDB storage.KeyValueStore
nodeBucketDB storage.KeyValueStore
transport *proto.NodeTransport
mutex *sync.Mutex
idLength int // kbucket and node id bit length (SHA256) = 256
bucketSize int // max number of nodes stored in a kbucket = 20 (k)
}
// NewRoutingTable returns a newly configured instance of a RoutingTable
func NewRoutingTable(localNode *proto.Node, kpath string, npath string, idLength int, bucketSize int) (*RoutingTable, error) {
logger := zap.L()
kdb, err := boltdb.NewClient(logger, kpath, boltdb.KBucket)
if err != nil {
return nil, RoutingErr.New("could not create kadBucketDB: %s", err)
}
ndb, err := boltdb.NewClient(logger, npath, boltdb.NodeBucket)
if err != nil {
return nil, RoutingErr.New("could not create nodeBucketDB: %s", err)
}
return &RoutingTable{
self: localNode,
kadBucketDB: kdb,
nodeBucketDB: ndb,
transport: &defaultTransport,
mutex: &sync.Mutex{},
idLength: idLength,
bucketSize: bucketSize,
}, nil
}
// addNode attempts to add a new contact to the routing table
// Note: Local Node must be added to the routing table first
// Requires node not already in table
func (rt RoutingTable) addNode(node *proto.Node) error {
rt.mutex.Lock()
defer rt.mutex.Unlock()
nodeKey := storage.Key(node.Id)
if bytes.Equal(nodeKey, storage.Key(rt.self.Id)) {
err := rt.createOrUpdateKBucket(rt.createFirstBucketID(), time.Now())
if err != nil {
return RoutingErr.New("could not create initial K bucket: %s", err)
}
nodeValue, err := rt.marshalNode(node)
if err != nil {
return RoutingErr.New("could not marshal initial node: %s", err)
}
err = rt.putNode(nodeKey, nodeValue)
if err != nil {
return RoutingErr.New("could not add initial node to nodeBucketDB: %s", err)
}
return nil
}
kadBucketID, err := rt.getKBucketID(nodeKey)
if err != nil {
return RoutingErr.New("could not getKBucketID: %s", err)
}
hasRoom, err := rt.kadBucketHasRoom(kadBucketID)
if err != nil {
return err
}
containsLocal, err := rt.kadBucketContainsLocalNode(kadBucketID)
if err != nil {
return err
}
withinK, err := rt.nodeIsWithinNearestK(nodeKey)
if err != nil {
return RoutingErr.New("could not determine if node is within k: %s", err)
}
for !hasRoom {
if containsLocal || withinK {
depth, err := rt.determineLeafDepth(kadBucketID)
if err != nil {
return RoutingErr.New("could not determine leaf depth: %s", err)
}
kadBucketID = rt.splitBucket(kadBucketID, depth)
err = rt.createOrUpdateKBucket(kadBucketID, time.Now())
if err != nil {
return RoutingErr.New("could not split and create K bucket: %s", err)
}
kadBucketID, err = rt.getKBucketID(nodeKey)
if err != nil {
return RoutingErr.New("could not get k bucket Id within add node split bucket checks: %s", err)
}
hasRoom, err = rt.kadBucketHasRoom(kadBucketID)
if err != nil {
return err
}
containsLocal, err = rt.kadBucketContainsLocalNode(kadBucketID)
if err != nil {
return err
}
} else {
return nil
}
}
nodeValue, err := rt.marshalNode(node)
if err != nil {
return RoutingErr.New("could not marshal node: %s", err)
}
err = rt.putNode(nodeKey, nodeValue)
if err != nil {
return RoutingErr.New("could not add node to nodeBucketDB: %s", err)
}
err = rt.createOrUpdateKBucket(kadBucketID, time.Now())
if err != nil {
return RoutingErr.New("could not create or update K bucket: %s", err)
}
return nil
}
// nodeAlreadyExists will return true if the given node ID exists within nodeBucketDB
func (rt RoutingTable) nodeAlreadyExists(nodeID storage.Key) (bool, error) {
node, err := rt.nodeBucketDB.Get(nodeID)
if err != nil {
return false, err
}
if node == nil {
return false, nil
}
return true, nil
}
// updateNode will update the node information given that
// the node is already in the routing table.
func (rt RoutingTable) updateNode(node *proto.Node) error {
//TODO (JJ)
return nil
}
// removeNode will remove nodes and replace those entries with nodes
// from the replacement cache.
// We want to replace churned nodes (no longer online)
func (rt RoutingTable) removeNode(nodeID storage.Key) error {
//TODO (JJ)
return nil
}
// marshalNode: helper, sanitizes proto Node for db insertion
func (rt RoutingTable) marshalNode(node *proto.Node) ([]byte, error) {
nodeVal, err := pb.Marshal(node)
if err != nil {
return nil, RoutingErr.New("could not marshal proto node: %s", err)
}
return nodeVal, nil
}
// putNode: helper, adds proto Node and ID to nodeBucketDB
func (rt RoutingTable) putNode(nodeKey storage.Key, nodeValue storage.Value) error {
err := rt.nodeBucketDB.Put(nodeKey, nodeValue)
if err != nil {
return RoutingErr.New("could not add key value pair to nodeBucketDB: %s", err)
}
return nil
}
// createOrUpdateKBucket: helper, adds or updates given kbucket
func (rt RoutingTable) createOrUpdateKBucket(bucketID storage.Key, now time.Time) error {
dateTime := string(now.UTC().UnixNano())
err := rt.kadBucketDB.Put(bucketID, []byte(dateTime))
if err != nil {
return RoutingErr.New("could not add or update k bucket: %s", err)
}
return nil
}
// getKBucketID: helper, returns the id of the corresponding k bucket given a node id
func (rt RoutingTable) getKBucketID(nodeID storage.Key) (storage.Key, error) {
kadBucketIDs, err := rt.kadBucketDB.List(nil, 0)
if err != nil {
return nil, RoutingErr.New("could not list all k bucket ids: %s", err)
}
smallestKey := rt.createZeroAsStorageKey()
var keys storage.Keys
keys = append(keys, smallestKey)
keys = append(keys, kadBucketIDs...)
for i := 0; i < len(keys)-1; i++ {
if bytes.Compare(nodeID, keys[i]) > 0 && bytes.Compare(nodeID, keys[i+1]) <= 0 {
return keys[i+1], nil
}
}
//shouldn't happen BUT return error if no matching kbucket...
return nil, RoutingErr.New("could not find k bucket")
}
// sortByXOR: helper, quick sorts node IDs by xor from local node, smallest xor to largest
func (rt RoutingTable) sortByXOR(nodeIDs storage.Keys) storage.Keys {
if len(nodeIDs) < 2 {
return nodeIDs
}
left, right := 0, len(nodeIDs)-1
pivot := rand.Int() % len(nodeIDs)
nodeIDs[pivot], nodeIDs[right] = nodeIDs[right], nodeIDs[pivot]
for i := range nodeIDs {
xorI := xorTwoIds(nodeIDs[i], []byte(rt.self.Id))
xorR := xorTwoIds(nodeIDs[right], []byte(rt.self.Id))
if bytes.Compare(xorI, xorR) < 0 {
nodeIDs[left], nodeIDs[i] = nodeIDs[i], nodeIDs[left]
left++
}
}
nodeIDs[left], nodeIDs[right] = nodeIDs[right], nodeIDs[left]
rt.sortByXOR(nodeIDs[:left])
rt.sortByXOR(nodeIDs[left+1:])
return nodeIDs
}
// determineFurthestIDWithinK: helper, determines the furthest node within the k closest to local node
func (rt RoutingTable) determineFurthestIDWithinK(nodeIDs storage.Keys) ([]byte, error) {
sortedNodes := rt.sortByXOR(nodeIDs)
if len(sortedNodes) < rt.bucketSize+1 { //adding 1 since we're not including local node in closest k
return sortedNodes[len(sortedNodes)-1], nil
}
return sortedNodes[rt.bucketSize], nil
}
// xorTwoIds: helper, finds the xor distance between two byte slices
func xorTwoIds(id []byte, comparisonID []byte) []byte {
var xorArr []byte
for i := 0; i < len(id); i++ {
xor := id[i] ^ comparisonID[i]
xorArr = append(xorArr, xor)
}
return xorArr
}
// nodeIsWithinNearestK: helper, returns true if the node in question is within the nearest k from local node
func (rt RoutingTable) nodeIsWithinNearestK(nodeID storage.Key) (bool, error) {
nodes, err := rt.nodeBucketDB.List(nil, 0)
if err != nil {
return false, RoutingErr.New("could not get nodes: %s", err)
}
nodeCount := len(nodes)
if nodeCount < rt.bucketSize+1 { //adding 1 since we're not including local node in closest k
return true, nil
}
furthestIDWithinK, err := rt.determineFurthestIDWithinK(nodes)
localNodeID := rt.self.Id
existingXor := xorTwoIds(furthestIDWithinK, []byte(localNodeID))
newXor := xorTwoIds(nodeID, []byte(localNodeID))
if bytes.Compare(newXor, existingXor) < 0 {
return true, nil
}
return false, nil
}
// kadBucketContainsLocalNode returns true if the kbucket in question contains the local node
func (rt RoutingTable) kadBucketContainsLocalNode(bucketID storage.Key) (bool, error) {
key := storage.Key(rt.self.Id)
bucket, err := rt.getKBucketID(key)
if err != nil {
return false, err
}
if bytes.Compare(bucket, bucketID) == 0 {
return true, nil
}
return false, nil
}
// kadBucketHasRoom: helper, returns true if it has fewer than k nodes
func (rt RoutingTable) kadBucketHasRoom(bucketID storage.Key) (bool, error) {
nodes, err := rt.getNodeIDsWithinKBucket(bucketID)
if err != nil {
return false, err
}
if len(nodes) < rt.bucketSize {
return true, nil
}
return false, nil
}
// getNodeIDsWithinKBucket: helper, returns a collection of all the node ids contained within the kbucket
func (rt RoutingTable) getNodeIDsWithinKBucket(bucketID storage.Key) (storage.Keys, error) {
endpoints, err := rt.getKBucketRange(bucketID)
if err != nil {
return nil, err
}
left := endpoints[0]
right := endpoints[1]
var nodeIDs storage.Keys
allNodeIDs, err := rt.nodeBucketDB.List(nil, 0)
if err != nil {
return nil, RoutingErr.New("could not list nodes %s", err)
}
for _, v := range allNodeIDs {
if (bytes.Compare(v, left) > 0) && (bytes.Compare(v, right) <= 0) {
nodeIDs = append(nodeIDs, v)
if len(nodeIDs) == rt.bucketSize {
break
}
}
}
if len(nodeIDs) > 0 {
return nodeIDs, nil
}
return nil, nil
}
// getKBucketRange: helper, returns the left and right endpoints of the range of node ids contained within the bucket
func (rt RoutingTable) getKBucketRange(bucketID storage.Key) (storage.Keys, error) {
key := storage.Key(bucketID)
kadIDs, err := rt.kadBucketDB.ReverseList(key, 2)
if err != nil {
return nil, RoutingErr.New("could not reverse list k bucket ids %s", err)
}
coords := make(storage.Keys, 2)
if len(kadIDs) < 2 {
coords[0] = rt.createZeroAsStorageKey()
} else {
coords[0] = kadIDs[1]
}
coords[1] = kadIDs[0]
return coords, nil
}
// createFirstBucketID creates byte slice representing 11..11
func (rt RoutingTable) createFirstBucketID() []byte {
var id []byte
x := byte(255)
bytesLength := rt.idLength / 8
for i := 0; i < bytesLength; i++ {
id = append(id, x)
}
return id
}
// createZeroAsStorageKey creates storage Key representation of 00..00
func (rt RoutingTable) createZeroAsStorageKey() storage.Key {
var id []byte
x := byte(0)
bytesLength := rt.idLength / 8
for i := 0; i < bytesLength; i++ {
id = append(id, x)
}
return id
}
// determineLeafDepth determines the level of the bucket id in question.
// Eg level 0 means there is only 1 bucket, level 1 means the bucket has been split once, and so on
func (rt RoutingTable) determineLeafDepth(bucketID storage.Key) (int, error) {
bucketRange, err := rt.getKBucketRange(bucketID)
if err != nil {
return -1, RoutingErr.New("could not get k bucket range %s", err)
}
smaller := bucketRange[0]
diffBit, err := rt.determineDifferingBitIndex(bucketID, smaller)
if err != nil {
return diffBit + 1, RoutingErr.New("could not determine differing bit %s", err)
}
return diffBit + 1, nil
}
// determineDifferingBitIndex: helper, returns the last bit differs starting from prefix to suffix
func (rt RoutingTable) determineDifferingBitIndex(bucketID storage.Key, comparisonID storage.Key) (int, error) {
if bytes.Equal(bucketID, comparisonID) {
return -2, RoutingErr.New("compared two equivalent k bucket ids")
}
if bytes.Equal(comparisonID, rt.createZeroAsStorageKey()) {
comparisonID = rt.createFirstBucketID()
}
var differingByteIndex int
var differingByteXor byte
xorArr := xorTwoIds(bucketID, comparisonID)
if bytes.Equal(xorArr, rt.createFirstBucketID()) {
return -1, nil
}
for j, v := range xorArr {
if v != byte(0) {
differingByteIndex = j
differingByteXor = v
break
}
}
h := 0
for ; h < 8; h++ {
toggle := byte(1 << uint(h))
tempXor := differingByteXor
tempXor ^= toggle
if tempXor < differingByteXor {
break
}
}
bitInByteIndex := 7 - h
byteIndex := differingByteIndex
bitIndex := byteIndex*8 + bitInByteIndex
return bitIndex, nil
}
// splitBucket: helper, returns the smaller of the two new bucket ids
// the original bucket id becomes the greater of the 2 new
func (rt RoutingTable) splitBucket(bucketID []byte, depth int) []byte {
newID := make([]byte, len(bucketID))
copy(newID, bucketID)
bitIndex := depth
byteIndex := bitIndex / 8
bitInByteIndex := 7 - (bitIndex % 8)
toggle := byte(1 << uint(bitInByteIndex))
newID[byteIndex] ^= toggle
return newID
}

View File

@ -0,0 +1,589 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package kademlia
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
proto "storj.io/storj/protos/overlay"
"storj.io/storj/storage"
)
func tempfile(fileName string) string {
f, err := ioutil.TempFile("", fileName)
if err != nil {
panic(err)
}
f.Close()
err = os.Remove(f.Name())
if err != nil {
panic(err)
}
return f.Name()
}
func createRT() *RoutingTable {
localNodeID, _ := newID()
localNode := proto.Node{Id: string(localNodeID)}
rt, _ := NewRoutingTable(&localNode, tempfile("Kadbucket"), tempfile("Nodebucket"), 16, 6)
return rt
}
func mockNodes(id string) *proto.Node {
var node proto.Node
node.Id = id
return &node
}
func TestAddNode(t *testing.T) {
rt := createRT()
//add local node
rt.self = mockNodes("OO") //[79, 79] or [01001111, 01001111]
localNode := rt.self
err := rt.addNode(localNode)
assert.NoError(t, err)
bucket, err := rt.kadBucketDB.Get(storage.Key([]byte{255, 255}))
assert.NoError(t, err)
assert.NotNil(t, bucket)
//add node to unfilled kbucket
node1 := mockNodes("PO") //[80, 79] or [01010000, 01001111]
err = rt.addNode(node1)
assert.NoError(t, err)
kadKeys, err := rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err := rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(kadKeys))
assert.Equal(t, 2, len(nodeKeys))
//add node to full kbucket and split
node2 := mockNodes("NO") //[78, 79] or [01001110, 01001111]
err = rt.addNode(node2)
assert.NoError(t, err)
node3 := mockNodes("MO") //[77, 79] or [01001101, 01001111]
err = rt.addNode(node3)
assert.NoError(t, err)
node4 := mockNodes("LO") //[76, 79] or [01001100, 01001111]
err = rt.addNode(node4)
assert.NoError(t, err)
node5 := mockNodes("QO") //[81, 79] or [01010001, 01001111]
err = rt.addNode(node5)
assert.NoError(t, err)
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(kadKeys))
assert.Equal(t, 6, len(nodeKeys))
//splitting here
node6 := mockNodes("SO")
err = rt.addNode(node6)
assert.NoError(t, err)
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 5, len(kadKeys))
assert.Equal(t, 7, len(nodeKeys))
// //check how many keys in each bucket
a, err := rt.getNodeIDsWithinKBucket(kadKeys[0])
assert.NoError(t, err)
assert.Equal(t, 0, len(a))
b, err := rt.getNodeIDsWithinKBucket(kadKeys[1])
assert.NoError(t, err)
assert.Equal(t, 4, len(b))
c, err := rt.getNodeIDsWithinKBucket(kadKeys[2])
assert.NoError(t, err)
assert.Equal(t, 3, len(c))
d, err := rt.getNodeIDsWithinKBucket(kadKeys[3])
assert.NoError(t, err)
assert.Equal(t, 0, len(d))
e, err := rt.getNodeIDsWithinKBucket(kadKeys[4])
assert.NoError(t, err)
assert.Equal(t, 0, len(e))
//add node to full kbucket and drop
node7 := mockNodes("?O")
err = rt.addNode(node7)
assert.NoError(t, err)
node8 := mockNodes(">O")
err = rt.addNode(node8)
assert.NoError(t, err)
node9 := mockNodes("=O")
err = rt.addNode(node9)
assert.NoError(t, err)
node10 := mockNodes(";O")
err = rt.addNode(node10)
assert.NoError(t, err)
node11 := mockNodes(":O")
err = rt.addNode(node11)
assert.NoError(t, err)
node12 := mockNodes("9O")
err = rt.addNode(node12)
assert.NoError(t, err)
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 5, len(kadKeys))
assert.Equal(t, 13, len(nodeKeys))
a, err = rt.getNodeIDsWithinKBucket(kadKeys[0])
assert.NoError(t, err)
assert.Equal(t, 6, len(a))
//should drop
node13 := mockNodes("8O")
err = rt.addNode(node13)
assert.NoError(t, err)
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 5, len(kadKeys))
assert.Equal(t, 13, len(nodeKeys))
a, err = rt.getNodeIDsWithinKBucket(kadKeys[0])
assert.NoError(t, err)
assert.Equal(t, 6, len(a))
//add node to highly unbalanced tree
//adding to bucket 1
node14 := mockNodes("KO") //75
err = rt.addNode(node14)
assert.NoError(t, err)
node15 := mockNodes("JO") //74
err = rt.addNode(node15)
assert.NoError(t, err)
//adding to bucket 2
node16 := mockNodes("]O") //93
err = rt.addNode(node16)
assert.NoError(t, err)
node17 := mockNodes("^O") //94
err = rt.addNode(node17)
assert.NoError(t, err)
node18 := mockNodes("_O") //95
err = rt.addNode(node18)
assert.NoError(t, err)
b, err = rt.getNodeIDsWithinKBucket(kadKeys[1])
assert.NoError(t, err)
assert.Equal(t, 6, len(b))
c, err = rt.getNodeIDsWithinKBucket(kadKeys[2])
assert.NoError(t, err)
assert.Equal(t, 6, len(c))
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 5, len(kadKeys))
assert.Equal(t, 18, len(nodeKeys))
//split bucket 2
node19 := mockNodes("@O")
err = rt.addNode(node19)
assert.NoError(t, err)
kadKeys, err = rt.kadBucketDB.List(nil, 0)
assert.NoError(t, err)
nodeKeys, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, 6, len(kadKeys))
assert.Equal(t, 19, len(nodeKeys))
}
func TestCreateOrUpdateKBucket(t *testing.T) {
rt := createRT()
id := []byte{255, 255}
err := rt.createOrUpdateKBucket(storage.Key(id), time.Now())
assert.NoError(t, err)
val, e := rt.kadBucketDB.Get(storage.Key(id))
assert.NotNil(t, val)
assert.NoError(t, e)
}
func TestGetKBucketID(t *testing.T) {
rt := createRT()
kadIDA := storage.Key([]byte{255, 255})
kadIDB := storage.Key([]byte{127, 255})
kadIDC := storage.Key([]byte{63, 255})
now := time.Now()
rt.createOrUpdateKBucket(kadIDA, now)
rt.createOrUpdateKBucket(kadIDB, now)
rt.createOrUpdateKBucket(kadIDC, now)
nodeIDA := []byte{183, 255} //[10110111, 1111111]
nodeIDB := []byte{111, 255} //[01101111, 1111111]
nodeIDC := []byte{47, 255} //[00101111, 1111111]
keyA, _ := rt.getKBucketID(nodeIDA)
assert.Equal(t, kadIDA, keyA)
keyB, _ := rt.getKBucketID(nodeIDB)
assert.Equal(t, kadIDB, keyB)
keyC, _ := rt.getKBucketID(nodeIDC)
assert.Equal(t, kadIDC, keyC)
}
func TestXorTwoIds(t *testing.T) {
x := xorTwoIds([]byte{191}, []byte{159})
assert.Equal(t, []byte{32}, x) //00100000
}
func TestSortByXOR(t *testing.T) {
rt := createRT()
node1 := []byte{127, 255} //xor 0
rt.self.Id = string(node1)
rt.nodeBucketDB.Put(node1, []byte(""))
node2 := []byte{143, 255} //xor 240
rt.nodeBucketDB.Put(node2, []byte(""))
node3 := []byte{255, 255} //xor 128
rt.nodeBucketDB.Put(node3, []byte(""))
node4 := []byte{191, 255} //xor 192
rt.nodeBucketDB.Put(node4, []byte(""))
node5 := []byte{133, 255} //xor 250
rt.nodeBucketDB.Put(node5, []byte(""))
nodes, err := rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
expectedNodes := storage.Keys{node1, node5, node2, node4, node3}
assert.Equal(t, expectedNodes, nodes)
sortedNodes := rt.sortByXOR(nodes)
expectedSorted := storage.Keys{node1, node3, node4, node2, node5}
assert.Equal(t, expectedSorted, sortedNodes)
nodes, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
assert.Equal(t, expectedNodes, nodes)
}
func TestDetermineFurthestIDWithinK(t *testing.T) {
rt := createRT()
node1 := []byte{127, 255} //xor 0
rt.self.Id = string(node1)
rt.nodeBucketDB.Put(node1, []byte(""))
expectedFurthest := node1
nodes, err := rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
furthest, err := rt.determineFurthestIDWithinK(nodes)
assert.Equal(t, expectedFurthest, furthest)
node2 := []byte{143, 255} //xor 240
rt.nodeBucketDB.Put(node2, []byte(""))
expectedFurthest = node2
nodes, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
furthest, err = rt.determineFurthestIDWithinK(nodes)
assert.Equal(t, expectedFurthest, furthest)
node3 := []byte{255, 255} //xor 128
rt.nodeBucketDB.Put(node3, []byte(""))
expectedFurthest = node2
nodes, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
furthest, err = rt.determineFurthestIDWithinK(nodes)
assert.Equal(t, expectedFurthest, furthest)
node4 := []byte{191, 255} //xor 192
rt.nodeBucketDB.Put(node4, []byte(""))
expectedFurthest = node2
nodes, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
furthest, err = rt.determineFurthestIDWithinK(nodes)
assert.Equal(t, expectedFurthest, furthest)
node5 := []byte{133, 255} //xor 250
rt.nodeBucketDB.Put(node5, []byte(""))
expectedFurthest = node5
nodes, err = rt.nodeBucketDB.List(nil, 0)
assert.NoError(t, err)
furthest, err = rt.determineFurthestIDWithinK(nodes)
assert.Equal(t, expectedFurthest, furthest)
}
func TestNodeIsWithinNearestK(t *testing.T) {
rt := createRT()
rt.bucketSize = 2
selfNode := []byte{127, 255}
rt.self.Id = string(selfNode)
rt.nodeBucketDB.Put(selfNode, []byte(""))
expectTrue, err := rt.nodeIsWithinNearestK(selfNode)
assert.NoError(t, err)
assert.True(t, expectTrue)
furthestNode := []byte{143, 255}
expectTrue, err = rt.nodeIsWithinNearestK(furthestNode)
assert.NoError(t, err)
assert.True(t, expectTrue)
rt.nodeBucketDB.Put(furthestNode, []byte(""))
node1 := []byte{255, 255}
expectTrue, err = rt.nodeIsWithinNearestK(node1)
assert.NoError(t, err)
assert.True(t, expectTrue)
rt.nodeBucketDB.Put(node1, []byte(""))
node2 := []byte{191, 255}
expectTrue, err = rt.nodeIsWithinNearestK(node2)
assert.NoError(t, err)
assert.True(t, expectTrue)
rt.nodeBucketDB.Put(node1, []byte(""))
node3 := []byte{133, 255}
expectFalse, err := rt.nodeIsWithinNearestK(node3)
assert.NoError(t, err)
assert.False(t, expectFalse)
}
func TestKadBucketContainsLocalNode(t *testing.T) {
rt := createRT()
kadIDA := storage.Key([]byte{255, 255})
kadIDB := storage.Key([]byte{127, 255})
now := time.Now()
err := rt.createOrUpdateKBucket(kadIDA, now)
assert.NoError(t, err)
err = rt.createOrUpdateKBucket(kadIDB, now)
assert.NoError(t, err)
nodeIDA := []byte{183, 255} //[10110111, 1111111]
err = rt.nodeBucketDB.Put(nodeIDA, []byte(""))
assert.NoError(t, err)
rt.self.Id = string(nodeIDA)
resultTrue, err := rt.kadBucketContainsLocalNode(kadIDA)
assert.NoError(t, err)
resultFalse, err := rt.kadBucketContainsLocalNode(kadIDB)
assert.NoError(t, err)
assert.True(t, resultTrue)
assert.False(t, resultFalse)
}
func TestKadBucketHasRoom(t *testing.T) {
//valid when kad bucket has >0 nodes in it due to ../storage/boltdb/client.go:99
rt := createRT()
kadIDA := storage.Key([]byte{255, 255})
now := time.Now()
rt.createOrUpdateKBucket(kadIDA, now)
node1 := []byte{255, 255}
node2 := []byte{191, 255}
node3 := []byte{127, 255}
node4 := []byte{63, 255}
node5 := []byte{159, 255}
node6 := []byte{0, 127}
rt.nodeBucketDB.Put(node1, []byte(""))
resultA, err := rt.kadBucketHasRoom(kadIDA)
assert.NoError(t, err)
assert.True(t, resultA)
rt.nodeBucketDB.Put(node2, []byte(""))
rt.nodeBucketDB.Put(node3, []byte(""))
rt.nodeBucketDB.Put(node4, []byte(""))
rt.nodeBucketDB.Put(node5, []byte(""))
rt.nodeBucketDB.Put(node6, []byte(""))
resultB, err := rt.kadBucketHasRoom(kadIDA)
assert.NoError(t, err)
assert.False(t, resultB)
}
func TestGetNodeIDsWithinKBucket(t *testing.T) {
rt := createRT()
kadIDA := storage.Key([]byte{255, 255})
kadIDB := storage.Key([]byte{127, 255})
now := time.Now()
rt.createOrUpdateKBucket(kadIDA, now)
rt.createOrUpdateKBucket(kadIDB, now)
nodeIDA := []byte{183, 255} //[10110111, 1111111]
nodeIDB := []byte{111, 255} //[01101111, 1111111]
nodeIDC := []byte{47, 255} //[00101111, 1111111]
rt.nodeBucketDB.Put(nodeIDA, []byte(""))
rt.nodeBucketDB.Put(nodeIDB, []byte(""))
rt.nodeBucketDB.Put(nodeIDC, []byte(""))
expectedA := storage.Keys{nodeIDA}
expectedB := storage.Keys{nodeIDC, nodeIDB}
A, err := rt.getNodeIDsWithinKBucket(kadIDA)
assert.NoError(t, err)
B, err := rt.getNodeIDsWithinKBucket(kadIDB)
assert.NoError(t, err)
assert.Equal(t, expectedA, A)
assert.Equal(t, expectedB, B)
}
func TestGetKBucketRange(t *testing.T) {
rt := createRT()
idA := []byte{255, 255}
idB := []byte{127, 255}
idC := []byte{63, 255}
rt.kadBucketDB.Put(idA, []byte(""))
rt.kadBucketDB.Put(idB, []byte(""))
rt.kadBucketDB.Put(idC, []byte(""))
expectedA := storage.Keys{idB, idA}
expectedB := storage.Keys{idC, idB}
expectedC := storage.Keys{rt.createZeroAsStorageKey(), idC}
endpointsA, err := rt.getKBucketRange(idA)
assert.NoError(t, err)
endpointsB, err := rt.getKBucketRange(idB)
assert.NoError(t, err)
endpointsC, err := rt.getKBucketRange(idC)
assert.NoError(t, err)
assert.Equal(t, expectedA, endpointsA)
assert.Equal(t, expectedB, endpointsB)
assert.Equal(t, expectedC, endpointsC)
}
func TestCreateFirstBucketID(t *testing.T) {
rt := createRT()
x := rt.createFirstBucketID()
expected := []byte{255, 255}
assert.Equal(t, x, expected)
}
func TestCreateZeroAsStorageKey(t *testing.T) {
rt := createRT()
zero := rt.createZeroAsStorageKey()
expected := []byte{0, 0}
assert.Equal(t, zero, storage.Key(expected))
}
func TestDetermineLeafDepth(t *testing.T) {
rt := createRT()
idA := []byte{255, 255}
idB := []byte{127, 255}
idC := []byte{63, 255}
err := rt.kadBucketDB.Put(idA, []byte(""))
assert.NoError(t, err)
first, err := rt.determineLeafDepth(idA)
assert.NoError(t, err)
assert.Equal(t, 0, first)
err = rt.kadBucketDB.Put(idB, []byte(""))
assert.NoError(t, err)
second, err := rt.determineLeafDepth(idB)
assert.NoError(t, err)
assert.Equal(t, 1, second)
err = rt.kadBucketDB.Put(idC, []byte(""))
assert.NoError(t, err)
one, err := rt.determineLeafDepth(idA)
assert.NoError(t, err)
assert.Equal(t, 1, one)
two, err := rt.determineLeafDepth(idB)
assert.NoError(t, err)
assert.Equal(t, 2, two)
alsoTwo, err := rt.determineLeafDepth(idC)
assert.NoError(t, err)
assert.Equal(t, 2, alsoTwo)
}
func TestDetermineDifferingBitIndex(t *testing.T) {
rt := createRT()
diff, err := rt.determineDifferingBitIndex([]byte{191, 255}, []byte{255, 255})
assert.NoError(t, err)
assert.Equal(t, 1, diff)
diff, err = rt.determineDifferingBitIndex([]byte{255, 255}, []byte{191, 255})
assert.NoError(t, err)
assert.Equal(t, 1, diff)
diff, err = rt.determineDifferingBitIndex([]byte{95, 255}, []byte{127, 255})
assert.NoError(t, err)
assert.Equal(t, 2, diff)
diff, err = rt.determineDifferingBitIndex([]byte{95, 255}, []byte{79, 255})
assert.NoError(t, err)
assert.Equal(t, 3, diff)
diff, err = rt.determineDifferingBitIndex([]byte{95, 255}, []byte{63, 255})
assert.NoError(t, err)
assert.Equal(t, 2, diff)
diff, err = rt.determineDifferingBitIndex([]byte{95, 255}, []byte{79, 255})
assert.NoError(t, err)
assert.Equal(t, 3, diff)
diff, err = rt.determineDifferingBitIndex([]byte{255, 255}, []byte{255, 255})
assert.Error(t, err)
assert.Equal(t, -2, diff)
diff, err = rt.determineDifferingBitIndex([]byte{255, 255}, []byte{0, 0})
assert.NoError(t, err)
assert.Equal(t, -1, diff)
diff, err = rt.determineDifferingBitIndex([]byte{127, 255}, []byte{0, 0})
assert.NoError(t, err)
assert.Equal(t, 0, diff)
diff, err = rt.determineDifferingBitIndex([]byte{63, 255}, []byte{0, 0})
assert.NoError(t, err)
assert.Equal(t, 1, diff)
diff, err = rt.determineDifferingBitIndex([]byte{31, 255}, []byte{0, 0})
assert.NoError(t, err)
assert.Equal(t, 2, diff)
diff, err = rt.determineDifferingBitIndex([]byte{95, 255}, []byte{63, 255})
assert.NoError(t, err)
assert.Equal(t, 2, diff)
}
func TestSplitBucket(t *testing.T) {
rt := createRT()
id1 := []byte{255, 255}
id2 := []byte{191, 255}
id3 := []byte{127, 255}
id4 := []byte{63, 255}
id5 := []byte{159, 255}
id6 := []byte{0, 127}
id7 := []byte{0, 255}
id8 := []byte{95, 255}
id9 := []byte{87, 255}
newID1 := rt.splitBucket(id1, 1) //[11111111, 11111111] -> [10111111, 11111111]
assert.Equal(t, id2, newID1)
newID2 := rt.splitBucket(id2, 2) //[10111111, 11111111] -> [10011111, 11111111]
assert.Equal(t, id5, newID2)
newID3 := rt.splitBucket(id3, 1) //[01111111, 11111111] -> [00111111, 11111111]
assert.Equal(t, id4, newID3)
newID4 := rt.splitBucket(id7, 8) //[00000000, 11111111] -> [00000000, 01111111]
assert.Equal(t, id6, newID4)
newID5 := rt.splitBucket(id8, 4) //[01011111, 11111111] -> [01010111, 11111111]
assert.Equal(t, id9, newID5)
newID6 := rt.splitBucket(id8, 3)
assert.Equal(t, []byte{79, 255}, newID6)
}

View File

@ -88,6 +88,16 @@ func (mr *MockKeyValueStoreMockRecorder) List(arg0, arg1 interface{}) *gomock.Ca
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockKeyValueStore)(nil).List), arg0, arg1)
}
// ReverseList mocks base method
func (m *MockKeyValueStore) ReverseList(arg0 storage.Key, arg1 storage.Limit) (storage.Keys, error) {
return m.List(arg0, arg1)
}
// ReverseList indicates an expected call of ReverseList
func (mr *MockKeyValueStoreMockRecorder) ReverseList(arg0, arg1 interface{}) *gomock.Call {
return mr.List(arg0, arg1)
}
// Put mocks base method
func (m *MockKeyValueStore) Put(arg0 storage.Key, arg1 storage.Value) error {
ret := m.ctrl.Call(m, "Put", arg0, arg1)

View File

@ -94,6 +94,16 @@ func (mr *MockPointerDBClientMockRecorder) List(arg0, arg1 interface{}, arg2 ...
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockPointerDBClient)(nil).List), varargs...)
}
// ReverseList mocks base method
func (m *MockPointerDBClient) ReverseList(arg0 context.Context, arg1 *pointerdb.ListRequest, arg2 ...grpc.CallOption) (*pointerdb.ListResponse, error) {
return m.List(arg0, arg1, arg2...)
}
// ReverseList indicates an expected call of ReverseList
func (mr *MockPointerDBClientMockRecorder) ReverseList(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
return mr.List(arg0, arg1, arg2...)
}
// Put mocks base method
func (m *MockPointerDBClient) Put(arg0 context.Context, arg1 *pointerdb.PutRequest, arg2 ...grpc.CallOption) (*pointerdb.PutResponse, error) {
varargs := []interface{}{arg0, arg1}

View File

@ -293,4 +293,4 @@ func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (resp *pb.De
}
s.logger.Debug("deleted pointer at path: " + string(req.GetPath()))
return &pb.DeleteResponse{}, nil
}
}

View File

@ -8,13 +8,13 @@ import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"storj.io/storj/pkg/paths"

View File

@ -126,7 +126,7 @@ func ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request,
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type",
"multipart/byteranges; boundary=" + mw.Boundary())
"multipart/byteranges; boundary="+mw.Boundary())
sendContent = func() (io.ReadCloser, error) { return ioutil.NopCloser(pr), nil }
// cause writing goroutine to fail and exit if CopyN doesn't finish.
defer func() {

View File

@ -9,8 +9,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"net/http/httptest"
"github.com/stretchr/testify/assert"
)
func TestServeContent(t *testing.T) {
@ -67,7 +68,7 @@ func TestServeContentParseRange(t *testing.T) {
for k, v := range map[string]string{"Etag": "\"abcde\""} {
writer.Header().Add(k, v)
}
ranger := ByteRanger([]byte("bytes=1-5/0,bytes=1-5/8",))
ranger := ByteRanger([]byte("bytes=1-5/0,bytes=1-5/8"))
ServeContent(context.Background(), writer, req, "", time.Now().UTC(), ranger)
@ -616,13 +617,13 @@ func Test_scanETag(t *testing.T) {
expectedRemain string
}{
{
name: "Empty ETag", s: "",
name: "Empty ETag", s: "",
expectedEtag: "",
expectedRemain: "",
},
{
name: "Empty ETag with W", s: "W/",
name: "Empty ETag with W", s: "W/",
expectedEtag: "",
expectedRemain: "",
},

View File

@ -8,6 +8,7 @@ import (
"context"
"io/ioutil"
"testing"
"github.com/stretchr/testify/assert"
)

View File

@ -4,9 +4,10 @@
package ranger
import (
"testing"
"github.com/stretchr/testify/assert"
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestRange(t *testing.T) {
@ -37,6 +38,6 @@ func TestRange(t *testing.T) {
}
func TestClose(t *testing.T) {
rr := readerAtReader{length:0}
rr := readerAtReader{length: 0}
assert.Nil(t, rr.Close())
}
}

View File

@ -30,7 +30,6 @@ func NewServer(driver, source string, logger *zap.Logger) (*Server, error) {
return nil, err
}
_, err = db.Exec(db.Schema())
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, err

View File

@ -8,10 +8,10 @@ import (
"github.com/boltdb/bolt"
"go.uber.org/zap"
"storj.io/storj/storage"
)
// boltClient implements the KeyValueStore interface
type boltClient struct {
logger *zap.Logger
db *bolt.DB
@ -26,6 +26,10 @@ const (
PointerBucket = "pointers"
// OverlayBucket is the string representing the bucket used for a bolt-backed overlay dht cache
OverlayBucket = "overlay"
// KBucket is the string representing the bucket used for the kademlia routing table k-bucket ids
KBucket = "kbuckets"
// NodeBucket is the string representing the bucket used for the kademlia routing table node ids
NodeBucket = "nodes"
)
var (
@ -82,16 +86,29 @@ func (c *boltClient) Get(pathKey storage.Key) (storage.Value, error) {
// List returns either a list of keys for which boltdb has values or an error.
func (c *boltClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
c.logger.Debug("entering bolt list")
return c.listHelper(false, startingKey, limit)
}
// ReverseList returns either a list of keys for which boltdb has values or an error.
// Starts from startingKey and iterates backwards
func (c *boltClient) ReverseList(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
c.logger.Debug("entering bolt reverse list")
return c.listHelper(true, startingKey, limit)
}
func (c *boltClient) listHelper(reverseList bool, startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
var paths storage.Keys
err := c.db.Update(func(tx *bolt.Tx) error {
cur := tx.Bucket(c.Bucket).Cursor()
var k []byte
start := firstOrLast(reverseList, cur)
iterate := prevOrNext(reverseList, cur)
if startingKey == nil {
k, _ = cur.First()
k, _ = start()
} else {
k, _ = cur.Seek(startingKey)
}
for ; k != nil; k, _ = cur.Next() {
for ; k != nil; k, _ = iterate() {
paths = append(paths, k)
if limit > 0 && int(limit) == int(len(paths)) {
break
@ -102,6 +119,20 @@ func (c *boltClient) List(startingKey storage.Key, limit storage.Limit) (storage
return paths, err
}
func firstOrLast(reverseList bool, cur *bolt.Cursor) func() ([]byte, []byte) {
if reverseList {
return cur.Last
}
return cur.First
}
func prevOrNext(reverseList bool, cur *bolt.Cursor) func() ([]byte, []byte) {
if reverseList {
return cur.Prev
}
return cur.Next
}
// Delete deletes a key/value pair from boltdb, for a given the key
func (c *boltClient) Delete(pathKey storage.Key) error {
c.logger.Debug("entering bolt delete: " + string(pathKey))

View File

@ -21,6 +21,7 @@ type KeyValueStore interface {
Put(Key, Value) error
Get(Key) (Value, error)
List(Key, Limit) (Keys, error)
ReverseList(Key, Limit) (Keys, error)
Delete(Key) error
Close() error
}

View File

@ -77,7 +77,7 @@ func (c *redisClient) Put(key storage.Key, value storage.Value) error {
return nil
}
// List returns either a list of keys for which boltdb has values or an error.
// List returns either a list of keys for which redis has values or an error.
func (c *redisClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
var noOrderKeys []string
if startingKey != nil {
@ -106,6 +106,13 @@ func (c *redisClient) List(startingKey storage.Key, limit storage.Limit) (storag
return listKeys, nil
}
// ReverseList returns either a list of keys for which redis has values or an error.
// Starts from startingKey and iterates backwards
func (c *redisClient) ReverseList(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
//TODO
return storage.Keys{}, nil
}
// Delete deletes a key/value pair from redis, for a given the key
func (c *redisClient) Delete(key storage.Key) error {
err := c.db.Del(key.String()).Err()