pkg/kademlia tests and restructuring (#97)

* port changes

* Merge remote-tracking branch 'upstream/master'

* Merge remote-tracking branch 'upstream/master'

* Merge remote-tracking branch 'upstream/master'

* files created

* Merge remote-tracking branch 'upstream/master' into coyle/kad-tests

* wip

* Merge remote-tracking branch 'upstream/master' into coyle/kad-tests

* wip

* remove bkad dependencie from tests

* wip

* wip

* wip

* wip

* wip

* updated coyle/kademlia

* wip

* cleanup

* ports

* overlay upgraded

* linter fixes

* piecestore kademlia newID

* add changes from kad demo

* PR comments addresses

* go func

* force travis build

* fixed merge conflicts

* fixed merge conflicts

* Merge branch 'coyle/kad-tests' of https://github.com/coyle/storj into coyle/kad-tests

* linter issues

* linting issues

* fixed merge conflicts

* linter is stupid
This commit is contained in:
Dennis Coyle 2018-06-22 09:33:57 -04:00 committed by GitHub
parent 35d5761ee6
commit a5ff5016c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 856 additions and 540 deletions

2
Gopkg.lock generated
View File

@ -60,7 +60,7 @@
branch = "master"
name = "github.com/coyle/kademlia"
packages = ["."]
revision = "23c5c505df986913b68fd7fa16af645c64de8557"
revision = "de4313d761cdd87b7f363586530e2b6c79460a72"
[[projects]]
name = "github.com/davecgh/go-spew"

View File

@ -56,10 +56,11 @@ run-overlay:
docker run -d \
--name=overlay \
--network test-net \
-p 127.0.0.1:8080:8080 \
-e REDIS_ADDRESS=redis:6379 \
-e REDIS_PASSWORD="" \
-e REDIS_DB=1 \
-e OVERLAY_PORT=8080 \
-e OVERLAY_PORT=7070 \
overlay
clean-local:

View File

@ -13,6 +13,7 @@ spec:
port: 8081
targetPort: 8081
- name: kademlia
protocol: UDP
port: 8080
target: 8080
selector:

View File

@ -52,7 +52,12 @@ func connectToKad(id, ip, kadlistenport, kadaddress string) *kademlia.Kademlia {
},
}
kad, err := kademlia.NewKademlia([]proto.Node{node}, ip, kadlistenport)
nodeid, err := kademlia.NewID()
if err != nil {
log.Fatalf("Failed to instantiate new Kademlia ID: %s", err.Error())
}
kad, err := kademlia.NewKademlia(nodeid, []proto.Node{node}, ip, kadlistenport)
if err != nil {
log.Fatalf("Failed to instantiate new Kademlia: %s", err.Error())
}

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/boltdb/bolt v1.3.1
github.com/ccding/go-stun v0.0.0-20171206150302-d9bbe8f8fa7b
github.com/cloudfoundry/gosigar v1.1.0
github.com/coyle/kademlia v0.0.0-20180604160050-23c5c505df98
github.com/coyle/kademlia v0.0.0-20180618235119-de4313d761cd
github.com/fatih/structs v1.0.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-redis/redis v0.0.0-20180417061816-9ccc23344a52

56
pkg/dht/dht.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package dht
import (
"context"
"time"
proto "storj.io/storj/protos/overlay"
)
// NodeID is the unique identifer used for Nodes in the DHT
type NodeID interface {
String() string
Bytes() []byte
}
// DHT is the interface for the DHT in the Storj network
type DHT interface {
GetNodes(ctx context.Context, start string, limit int) ([]*proto.Node, error)
GetRoutingTable(ctx context.Context) (RoutingTable, error)
Bootstrap(ctx context.Context) error
Ping(ctx context.Context, node proto.Node) (proto.Node, error)
FindNode(ctx context.Context, ID NodeID) (proto.Node, error)
Disconnect() error
}
// RoutingTable contains information on nodes we have locally
type RoutingTable interface {
// local params
Local() proto.Node
K() int
CacheSize() int
GetBucket(id string) (bucket Bucket, ok bool)
GetBuckets() ([]Bucket, error)
FindNear(id NodeID, limit int) ([]*proto.Node, error)
ConnectionSuccess(id string, address proto.NodeAddress)
ConnectionFailed(id string, address proto.NodeAddress)
// these are for refreshing
SetBucketTimestamp(id string, now time.Time) error
GetBucketTimestamp(id string, bucket Bucket) (time.Time, error)
}
// Bucket is a set of methods to act on kademlia k buckets
type Bucket interface {
Routing() []proto.Node
Cache() []proto.Node
Midpoint() string
Nodes() []*proto.Node
}

View File

@ -3,24 +3,29 @@
package kademlia
import "storj.io/storj/protos/overlay"
import proto "storj.io/storj/protos/overlay"
// KBucket implements the Bucket interface
type KBucket struct {
nodes []*overlay.Node
nodes []*proto.Node
}
// Routing __ (TODO) still not entirely sure what the bucket methods are supposed to do
func (b KBucket) Routing() []overlay.Node {
return []overlay.Node{}
func (b *KBucket) Routing() []proto.Node {
return []proto.Node{}
}
// Cache __ (TODO) still not entirely sure what the bucket methods are supposed to do
func (b KBucket) Cache() []overlay.Node {
return []overlay.Node{}
func (b *KBucket) Cache() []proto.Node {
return []proto.Node{}
}
// Midpoint __ (TODO) still not entirely sure what the bucket methods are supposed to do
func (b KBucket) Midpoint() string {
func (b *KBucket) Midpoint() string {
return ""
}
// Nodes returns the set of all nodes in a bucket
func (b *KBucket) Nodes() []*proto.Node {
return b.nodes
}

View File

@ -0,0 +1,4 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package kademlia

View File

@ -1,207 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"crypto/rand"
"fmt"
"net"
"strconv"
bkad "github.com/coyle/kademlia"
"github.com/zeebo/errs"
proto "storj.io/storj/protos/overlay"
)
// NodeErr is the class for all errors pertaining to node operations
var NodeErr = errs.Class("node error")
//TODO: shouldn't default to TCP but not sure what to do yet
var defaultTransport = proto.NodeTransport_TCP
// Kademlia is an implementation of kademlia adhering to the DHT interface.
type Kademlia struct {
rt RoutingTable
bootstrapNodes []proto.Node
ip string
port string
stun bool
dht *bkad.DHT
}
// NewKademlia returns a newly configured Kademlia instance
func NewKademlia(bootstrapNodes []proto.Node, ip string, port string) (*Kademlia, error) {
bb, err := convertProtoNodes(bootstrapNodes)
if err != nil {
return nil, err
}
id, err := newID() // TODO() use the real ID type after we settle on an implementation
if err != nil {
return nil, err
}
bdht, err := bkad.NewDHT(&bkad.MemoryStore{}, &bkad.Options{
ID: []byte(id),
IP: ip,
Port: port,
BootstrapNodes: bb,
})
if err != nil {
return nil, err
}
rt := RouteTable{
ht: bdht.HT,
dht: bdht,
}
return &Kademlia{
rt: rt,
bootstrapNodes: bootstrapNodes,
ip: ip,
port: port,
stun: true,
dht: bdht,
}, nil
}
// GetNodes returns all nodes from a starting node up to a maximum limit stored in the local routing table
func (k Kademlia) GetNodes(ctx context.Context, start string, limit int) ([]*proto.Node, error) {
if start == "" {
start = k.dht.GetSelfID()
}
nn, err := k.dht.FindNodes(ctx, start, limit)
if err != nil {
return []*proto.Node{}, err
}
return convertNetworkNodes(nn), nil
}
// GetRoutingTable provides the routing table for the Kademlia DHT
func (k *Kademlia) GetRoutingTable(ctx context.Context) (RoutingTable, error) {
return RouteTable{
ht: k.dht.HT,
dht: k.dht,
}, nil
}
// Bootstrap contacts one of a set of pre defined trusted nodes on the network and
// begins populating the local Kademlia node
func (k *Kademlia) Bootstrap(ctx context.Context) error {
return k.dht.Bootstrap()
}
// Ping checks that the provided node is still accessible on the network
func (k *Kademlia) Ping(ctx context.Context, node proto.Node) (proto.Node, error) {
n, err := convertProtoNode(node)
if err != nil {
return proto.Node{}, err
}
ok, err := k.dht.Ping(n)
if err != nil {
return proto.Node{}, err
}
if !ok {
return proto.Node{}, NodeErr.New("node unavailable")
}
return node, nil
}
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
// begins searching the network for the NodeID. Returns and error if node was not found
func (k *Kademlia) FindNode(ctx context.Context, ID NodeID) (proto.Node, error) {
nodes, err := k.dht.FindNode([]byte(ID))
if err != nil {
return proto.Node{}, err
}
for _, v := range nodes {
if string(v.ID) == string(ID) {
return proto.Node{Id: string(v.ID), Address: &proto.NodeAddress{
Transport: defaultTransport,
Address: fmt.Sprintf("%s:%d", v.IP.String(), v.Port),
},
}, nil
}
}
return proto.Node{}, NodeErr.New("node not found")
}
// ListenAndServe connects the kademlia node to the network and listens for incoming requests
func (k *Kademlia) ListenAndServe() error {
if err := k.dht.CreateSocket(); err != nil {
return err
}
go k.dht.Listen()
return nil
}
func convertProtoNodes(n []proto.Node) ([]*bkad.NetworkNode, error) {
nn := make([]*bkad.NetworkNode, len(n))
for i, v := range n {
node, err := convertProtoNode(v)
if err != nil {
return nil, err
}
nn[i] = node
}
return nn, nil
}
func convertNetworkNodes(n []*bkad.NetworkNode) []*proto.Node {
nn := make([]*proto.Node, len(n))
for i, v := range n {
nn[i] = convertNetworkNode(v)
}
return nn
}
func convertNetworkNode(v *bkad.NetworkNode) *proto.Node {
return &proto.Node{
Id: string(v.ID),
Address: &proto.NodeAddress{Transport: defaultTransport, Address: net.JoinHostPort(v.IP.String(), strconv.Itoa(v.Port))},
}
}
func convertProtoNode(v proto.Node) (*bkad.NetworkNode, error) {
host, port, err := net.SplitHostPort(v.GetAddress().GetAddress())
if err != nil {
return nil, err
}
nn := bkad.NewNetworkNode(host, port)
nn.ID = []byte(v.GetId())
return nn, nil
}
// newID generates a new random ID.
// This purely to get things working. We shouldn't use this as the ID in the actual network
func newID() ([]byte, error) {
result := make([]byte, 20)
_, err := rand.Read(result)
return result, err
}
// GetIntroNode determines the best node to bootstrap a new node onto the network
func GetIntroNode(ip, port string) proto.Node {
id, _ := newID() // TODO(coyle): This is solely to bootstrap our very first node, after we get an ID, we will just hardcode that ID
return proto.Node{
Id: string(id),
Address: &proto.NodeAddress{
Transport: defaultTransport,
Address: "bootstrap.storj.io:8080",
},
}
}

View File

@ -1,222 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"
"time"
bkad "github.com/coyle/kademlia"
"github.com/stretchr/testify/assert"
"storj.io/storj/protos/overlay"
)
const (
testNetSize = 20
)
func bootstrapTestNetwork(t *testing.T, ip, port string) []*bkad.DHT {
dhts := []*bkad.DHT{}
p, err := strconv.Atoi(port)
assert.NoError(t, err)
for i := 0; i < testNetSize; i++ {
id, err := newID()
assert.NoError(t, err)
dht, _ := bkad.NewDHT(&bkad.MemoryStore{}, &bkad.Options{
ID: id,
IP: ip,
Port: strconv.Itoa(p),
BootstrapNodes: []*bkad.NetworkNode{
bkad.NewNetworkNode("127.0.0.1", strconv.Itoa(p-1)),
},
})
p++
dhts = append(dhts, dht)
err = dht.CreateSocket()
assert.NoError(t, err)
}
for _, dht := range dhts {
go dht.Listen()
go func(dht *bkad.DHT) {
if err := dht.Bootstrap(); err != nil {
panic(err)
}
}(dht)
time.Sleep(200 * time.Millisecond)
}
return dhts
}
func newTestKademlia(t *testing.T, ip, port string, d *bkad.DHT) *Kademlia {
n := []overlay.Node{
overlay.Node{
Id: string(d.HT.Self.ID),
Address: &overlay.NodeAddress{
Address: fmt.Sprintf("127.0.0.1:%d", d.HT.Self.Port),
},
},
}
kad, err := NewKademlia(n, ip, port)
assert.NoError(t, err)
return kad
}
func TestBootstrap(t *testing.T) {
dhts := bootstrapTestNetwork(t, "127.0.0.1", "3001")
defer func(d []*bkad.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
cases := []struct {
k *Kademlia
}{
{
k: newTestKademlia(t, "127.0.0.1", "3000", dhts[rand.Intn(testNetSize)]),
},
}
for _, v := range cases {
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
node, err := v.k.FindNode(ctx, NodeID(dhts[0].HT.Self.ID))
assert.NoError(t, err)
assert.NotEmpty(t, node)
assert.Equal(t, string(dhts[0].HT.Self.ID), node.Id)
v.k.dht.Disconnect()
}
}
func TestGetNodes(t *testing.T) {
dhts := bootstrapTestNetwork(t, "127.0.0.1", "6001")
defer func(d []*bkad.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
cases := []struct {
k *Kademlia
start string
limit int
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)]),
start: string(dhts[0].HT.Self.ID),
limit: 10,
expectedErr: nil,
},
}
for _, v := range cases {
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
nodes, err := v.k.GetNodes(ctx, v.start, v.limit)
assert.Equal(t, v.expectedErr, err)
assert.Len(t, nodes, v.limit)
v.k.dht.Disconnect()
}
}
func TestFindNode(t *testing.T) {
dhts := bootstrapTestNetwork(t, "127.0.0.1", "6001")
defer func(d []*bkad.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
cases := []struct {
k *Kademlia
start string
input NodeID
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)]),
start: string(dhts[0].HT.Self.ID),
input: NodeID(dhts[rand.Intn(testNetSize)].HT.Self.ID),
expectedErr: nil,
},
}
for _, v := range cases {
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
node, err := v.k.FindNode(ctx, v.input)
assert.Equal(t, v.expectedErr, err)
assert.NotZero(t, node)
assert.Equal(t, node.Id, string(v.input))
v.k.dht.Disconnect()
}
}
func TestPing(t *testing.T) {
dhts := bootstrapTestNetwork(t, "127.0.0.1", "6001")
defer func(d []*bkad.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
r := dhts[rand.Intn(testNetSize)]
cases := []struct {
k *Kademlia
input overlay.Node
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)]),
input: overlay.Node{
Id: string(r.HT.Self.ID),
Address: &overlay.NodeAddress{
Transport: defaultTransport,
Address: fmt.Sprintf("%s:%d", r.HT.Self.IP.String(), r.HT.Self.Port),
},
},
expectedErr: nil,
},
}
for _, v := range cases {
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
node, err := v.k.Ping(ctx, v.input)
assert.Equal(t, v.expectedErr, err)
assert.NotEmpty(t, node)
assert.Equal(t, v.input, node)
v.k.dht.Disconnect()
}
}

View File

@ -5,47 +5,234 @@ package kademlia
import (
"context"
"time"
"crypto/rand"
"fmt"
"net"
"strconv"
"storj.io/storj/protos/overlay"
bkad "github.com/coyle/kademlia"
"github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay"
)
// NodeID is the unique identifer for a node on the network
type NodeID string
// NodeErr is the class for all errors pertaining to node operations
var NodeErr = errs.Class("node error")
// DHT is the interface for the DHT in the Storj network
type DHT interface {
GetNodes(ctx context.Context, start string, limit int) ([]*overlay.Node, error)
//TODO: shouldn't default to TCP but not sure what to do yet
var defaultTransport = proto.NodeTransport_TCP
GetRoutingTable(ctx context.Context) (RoutingTable, error)
Bootstrap(ctx context.Context) error
Ping(ctx context.Context, node overlay.Node) (overlay.Node, error)
FindNode(ctx context.Context, ID NodeID) (overlay.Node, error)
// Kademlia is an implementation of kademlia adhering to the DHT interface.
type Kademlia struct {
routingTable dht.RoutingTable
bootstrapNodes []proto.Node
ip string
port string
stun bool
dht *bkad.DHT
}
// RoutingTable contains information on nodes we have locally
type RoutingTable interface {
// local params
LocalID() NodeID
K() int
CacheSize() int
// NewKademlia returns a newly configured Kademlia instance
func NewKademlia(id dht.NodeID, bootstrapNodes []proto.Node, ip string, port string) (*Kademlia, error) {
if port == "" {
return nil, NodeErr.New("must specify port in request to NewKademlia")
}
GetBucket(id string) (bucket Bucket, ok bool)
GetBuckets() ([]Bucket, error)
ips, err := net.LookupIP(ip)
if err != nil {
return nil, err
}
FindNear(id NodeID, limit int) ([]*overlay.Node, error)
if len(ips) <= 0 {
return nil, errs.New("Invalid IP")
}
ConnectionSuccess(id string, address overlay.NodeAddress)
ConnectionFailed(id string, address overlay.NodeAddress)
ip = ips[0].String()
// these are for refreshing
SetBucketTimestamp(id string, now time.Time) error
GetBucketTimestamp(id string, bucket Bucket) (time.Time, error)
bnodes, err := convertProtoNodes(bootstrapNodes)
if err != nil {
return nil, err
}
bdht, err := bkad.NewDHT(&bkad.MemoryStore{}, &bkad.Options{
ID: id.Bytes(),
IP: ip,
Port: port,
BootstrapNodes: bnodes,
})
if err != nil {
return nil, err
}
rt := RouteTable{
ht: bdht.HT,
dht: bdht,
}
return &Kademlia{
routingTable: rt,
bootstrapNodes: bootstrapNodes,
ip: ip,
port: port,
stun: true,
dht: bdht,
}, nil
}
// Bucket is a set of methods to act on kademlia k buckets
type Bucket interface {
Routing() []overlay.Node
Cache() []overlay.Node
Midpoint() string
// Disconnect safely closes connections to the Kademlia network
func (k Kademlia) Disconnect() error {
return k.dht.Disconnect()
}
// GetNodes returns all nodes from a starting node up to a maximum limit stored in the local routing table
func (k Kademlia) GetNodes(ctx context.Context, start string, limit int) ([]*proto.Node, error) {
if start == "" {
start = k.dht.GetSelfID()
}
nn, err := k.dht.FindNodes(ctx, start, limit)
if err != nil {
return []*proto.Node{}, err
}
return convertNetworkNodes(nn), nil
}
// GetRoutingTable provides the routing table for the Kademlia DHT
func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) {
return RouteTable{
ht: k.dht.HT,
dht: k.dht,
}, nil
}
// Bootstrap contacts one of a set of pre defined trusted nodes on the network and
// begins populating the local Kademlia node
func (k *Kademlia) Bootstrap(ctx context.Context) error {
return k.dht.Bootstrap()
}
// Ping checks that the provided node is still accessible on the network
func (k *Kademlia) Ping(ctx context.Context, node proto.Node) (proto.Node, error) {
n, err := convertProtoNode(node)
if err != nil {
return proto.Node{}, err
}
ok, err := k.dht.Ping(n)
if err != nil {
return proto.Node{}, err
}
if !ok {
return proto.Node{}, NodeErr.New("node unavailable")
}
return node, nil
}
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
// begins searching the network for the NodeID. Returns and error if node was not found
func (k *Kademlia) FindNode(ctx context.Context, ID dht.NodeID) (proto.Node, error) {
nodes, err := k.dht.FindNode(ID.Bytes())
if err != nil {
return proto.Node{}, err
}
for _, v := range nodes {
if string(v.ID) == ID.String() {
return proto.Node{Id: string(v.ID), Address: &proto.NodeAddress{
Transport: defaultTransport,
Address: fmt.Sprintf("%s:%d", v.IP.String(), v.Port),
},
}, nil
}
}
return proto.Node{}, NodeErr.New("node not found")
}
// ListenAndServe connects the kademlia node to the network and listens for incoming requests
func (k *Kademlia) ListenAndServe() error {
if err := k.dht.CreateSocket(); err != nil {
return err
}
go k.dht.Listen()
return nil
}
func convertProtoNodes(n []proto.Node) ([]*bkad.NetworkNode, error) {
nn := make([]*bkad.NetworkNode, len(n))
for i, v := range n {
node, err := convertProtoNode(v)
if err != nil {
return nil, err
}
nn[i] = node
}
return nn, nil
}
func convertNetworkNodes(n []*bkad.NetworkNode) []*proto.Node {
nn := make([]*proto.Node, len(n))
for i, v := range n {
nn[i] = convertNetworkNode(v)
}
return nn
}
func convertNetworkNode(v *bkad.NetworkNode) *proto.Node {
return &proto.Node{
Id: string(v.ID),
Address: &proto.NodeAddress{Transport: defaultTransport, Address: net.JoinHostPort(v.IP.String(), strconv.Itoa(v.Port))},
}
}
func convertProtoNode(v proto.Node) (*bkad.NetworkNode, error) {
host, port, err := net.SplitHostPort(v.GetAddress().GetAddress())
if err != nil {
return nil, err
}
nn := bkad.NewNetworkNode(host, port)
nn.ID = []byte(v.GetId())
return nn, nil
}
// newID generates a new random ID.
// This purely to get things working. We shouldn't use this as the ID in the actual network
func newID() ([]byte, error) {
result := make([]byte, 20)
_, err := rand.Read(result)
return result, err
}
// GetIntroNode determines the best node to bootstrap a new node onto the network
func GetIntroNode(id, ip, port string) (*proto.Node, error) {
addr := "bootstrap.storj.io:8080"
if ip != "" && port != "" {
addr = ip + ":" + port
}
if id == "" {
i, err := newID()
if err != nil {
return nil, err
}
id = string(i)
}
return &proto.Node{
Id: id,
Address: &proto.NodeAddress{
Transport: defaultTransport,
Address: addr,
},
}, nil
}

View File

@ -0,0 +1,251 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"math/rand"
"strconv"
"testing"
"time"
"storj.io/storj/pkg/dht"
"github.com/stretchr/testify/assert"
"storj.io/storj/protos/overlay"
)
const (
testNetSize = 20
)
func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, overlay.Node) {
bid, err := newID()
assert.NoError(t, err)
bnid := NodeID(bid)
dhts := []dht.DHT{}
p, err := strconv.Atoi(port)
pm := strconv.Itoa(p)
assert.NoError(t, err)
intro, err := GetIntroNode(bnid.String(), ip, pm)
assert.NoError(t, err)
boot, err := NewKademlia(&bnid, []overlay.Node{*intro}, ip, pm)
assert.NoError(t, err)
rt, err := boot.GetRoutingTable(context.Background())
bootNode := rt.Local()
err = boot.ListenAndServe()
assert.NoError(t, err)
p++
err = boot.Bootstrap(context.Background())
assert.NoError(t, err)
for i := 0; i < testNetSize; i++ {
gg := strconv.Itoa(p)
nid, err := newID()
assert.NoError(t, err)
id := NodeID(nid)
dht, err := NewKademlia(&id, []overlay.Node{bootNode}, ip, gg)
assert.NoError(t, err)
p++
dhts = append(dhts, dht)
err = dht.ListenAndServe()
assert.NoError(t, err)
err = dht.Bootstrap(context.Background())
assert.NoError(t, err)
}
return dhts, bootNode
}
func newTestKademlia(t *testing.T, ip, port string, d dht.DHT, b overlay.Node) *Kademlia {
i, err := newID()
assert.NoError(t, err)
id := NodeID(i)
n := []overlay.Node{b}
kad, err := NewKademlia(&id, n, ip, port)
assert.NoError(t, err)
return kad
}
func TestBootstrap(t *testing.T) {
dhts, bootNode := bootstrapTestNetwork(t, "127.0.0.1", "3000")
defer func(d []dht.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
cases := []struct {
k *Kademlia
}{
{
k: newTestKademlia(t, "127.0.0.1", "2999", dhts[rand.Intn(testNetSize)], bootNode),
},
}
for _, v := range cases {
defer v.k.Disconnect()
err := v.k.ListenAndServe()
assert.NoError(t, err)
err = v.k.Bootstrap(context.Background())
assert.NoError(t, err)
ctx := context.Background()
rt, err := dhts[0].GetRoutingTable(context.Background())
assert.NoError(t, err)
localID := rt.Local().Id
n := NodeID(localID)
node, err := v.k.FindNode(ctx, &n)
assert.NoError(t, err)
assert.NotEmpty(t, node)
assert.Equal(t, localID, node.Id)
v.k.dht.Disconnect()
}
}
func TestGetNodes(t *testing.T) {
dhts, bootNode := bootstrapTestNetwork(t, "127.0.0.1", "6001")
defer func(d []dht.DHT) {
for _, v := range d {
err := v.Disconnect()
assert.NoError(t, err)
}
}(dhts)
cases := []struct {
k *Kademlia
start string
limit int
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)], bootNode),
limit: 10,
expectedErr: nil,
},
}
for _, v := range cases {
defer v.k.Disconnect()
ctx := context.Background()
err := v.k.ListenAndServe()
assert.Equal(t, v.expectedErr, err)
time.Sleep(time.Second)
err = v.k.Bootstrap(ctx)
assert.NoError(t, err)
rt, err := v.k.GetRoutingTable(context.Background())
assert.NoError(t, err)
start := rt.Local().Id
nodes, err := v.k.GetNodes(ctx, start, v.limit)
assert.Equal(t, v.expectedErr, err)
assert.Len(t, nodes, v.limit)
v.k.dht.Disconnect()
}
}
func TestFindNode(t *testing.T) {
dhts, bootNode := bootstrapTestNetwork(t, "127.0.0.1", "5001")
defer func(d []dht.DHT) {
for _, v := range d {
err := v.Disconnect()
assert.NoError(t, err)
}
}(dhts)
cases := []struct {
k *Kademlia
start string
input NodeID
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)], bootNode),
expectedErr: nil,
},
}
for _, v := range cases {
defer v.k.Disconnect()
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
rt, err := dhts[rand.Intn(testNetSize)].GetRoutingTable(context.Background())
assert.NoError(t, err)
id := NodeID(rt.Local().Id)
node, err := v.k.FindNode(ctx, &id)
assert.Equal(t, v.expectedErr, err)
assert.NotZero(t, node)
assert.Equal(t, node.Id, id.String())
}
}
func TestPing(t *testing.T) {
dhts, bootNode := bootstrapTestNetwork(t, "127.0.0.1", "4001")
defer func(d []dht.DHT) {
for _, v := range d {
v.Disconnect()
}
}(dhts)
r := dhts[rand.Intn(testNetSize)]
rt, err := r.GetRoutingTable(context.Background())
addr := rt.Local().Address
assert.NoError(t, err)
cases := []struct {
k *Kademlia
input overlay.Node
expectedErr error
}{
{
k: newTestKademlia(t, "127.0.0.1", "6000", dhts[rand.Intn(testNetSize)], bootNode),
input: overlay.Node{
Id: rt.Local().Id,
Address: &overlay.NodeAddress{
Transport: defaultTransport,
Address: addr.Address,
},
},
expectedErr: nil,
},
}
for _, v := range cases {
defer v.k.Disconnect()
ctx := context.Background()
go v.k.ListenAndServe()
time.Sleep(time.Second)
err := v.k.Bootstrap(ctx)
assert.NoError(t, err)
node, err := v.k.Ping(ctx, v.input)
assert.Equal(t, v.expectedErr, err)
assert.NotEmpty(t, node)
assert.Equal(t, v.input, node)
v.k.dht.Disconnect()
}
}

36
pkg/kademlia/node_id.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import base58 "github.com/jbenet/go-base58"
// NodeID is the unique identifer of a Node in the overlay network
type NodeID string
// String transforms the nodeID to a string type
func (n *NodeID) String() string {
return string(*n)
}
// Bytes transforms the nodeID to type []byte
func (n *NodeID) Bytes() []byte {
return []byte(*n)
}
// StringToNodeID trsansforms a string to a NodeID
func StringToNodeID(s string) *NodeID {
n := NodeID(s)
return &n
}
// NewID returns a pointer to a newly intialized NodeID
func NewID() (*NodeID, error) {
b, err := newID()
if err != nil {
return nil, err
}
bb := NodeID(base58.Encode(b))
return &bb, nil
}

View File

@ -12,7 +12,8 @@ import (
bkad "github.com/coyle/kademlia"
"storj.io/storj/protos/overlay"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay"
)
// RouteTable implements the RoutingTable interface
@ -29,9 +30,16 @@ func NewRouteTable(dht Kademlia) RouteTable {
}
}
// LocalID returns the local nodes ID
func (rt RouteTable) LocalID() NodeID {
return NodeID(rt.dht.GetSelfID())
// Local returns the local nodes ID
func (rt RouteTable) Local() proto.Node {
return proto.Node{
Id: string(rt.dht.HT.Self.ID),
Address: &proto.NodeAddress{
Transport: defaultTransport, // TODO(coyle): this should be stored on the route table
Address: fmt.Sprintf("%s:%d", rt.dht.HT.Self.IP.String(), rt.dht.HT.Self.Port),
},
}
}
// K returns the currently configured maximum of nodes to store in a bucket
@ -46,47 +54,48 @@ func (rt RouteTable) CacheSize() int {
}
// GetBucket retrieves a bucket from the local node
func (rt RouteTable) GetBucket(id string) (bucket Bucket, ok bool) {
func (rt RouteTable) GetBucket(id string) (bucket dht.Bucket, ok bool) {
i, err := hex.DecodeString(id)
if err != nil {
return KBucket{}, false
return &KBucket{}, false
}
b := rt.ht.GetBucket(i)
if b == nil {
return KBucket{}, false
return &KBucket{}, false
}
return KBucket{
return &KBucket{
nodes: convertNetworkNodes(b),
}, true
}
// GetBuckets retrieves all buckets from the local node
func (rt RouteTable) GetBuckets() (k []Bucket, err error) {
bs := []Bucket{}
func (rt RouteTable) GetBuckets() (k []dht.Bucket, err error) {
bs := []dht.Bucket{}
b := rt.ht.GetBuckets()
for i, v := range b {
bs[i] = KBucket{nodes: convertNetworkNodes(v)}
for _, v := range b {
bs = append(bs, &KBucket{nodes: convertNetworkNodes(v)})
}
return bs, nil
}
// FindNear finds all Nodes near the provided nodeID up to the provided limit
func (rt RouteTable) FindNear(id NodeID, limit int) ([]*overlay.Node, error) {
return convertNetworkNodes(rt.ht.GetClosestContacts([]byte(id), limit)), nil
func (rt RouteTable) FindNear(id dht.NodeID, limit int) ([]*proto.Node, error) {
return convertNetworkNodes(rt.ht.GetClosestContacts(id.Bytes(), limit)), nil
}
// ConnectionSuccess handles the details of what kademlia should do when
// a successful connection is made to node on the network
func (rt RouteTable) ConnectionSuccess(id string, address overlay.NodeAddress) {
func (rt RouteTable) ConnectionSuccess(id string, address proto.NodeAddress) {
// TODO: What should we do ?
return
}
// ConnectionFailed handles the details of what kademlia should do when
// a connection fails for a node on the network
func (rt RouteTable) ConnectionFailed(id string, address overlay.NodeAddress) {
func (rt RouteTable) ConnectionFailed(id string, address proto.NodeAddress) {
// TODO: What should we do ?
return
}
@ -104,13 +113,11 @@ func (rt RouteTable) SetBucketTimestamp(id string, now time.Time) error {
}
// GetBucketTimestamp retrieves the last updated time for a bucket
func (rt RouteTable) GetBucketTimestamp(id string, bucket Bucket) (time.Time, error) {
func (rt RouteTable) GetBucketTimestamp(id string, bucket dht.Bucket) (time.Time, error) {
return rt.dht.GetExpirationTime([]byte(id)), nil
}
// GetNodeRoutingTable gets a routing table for a given node rather than the local node's routing table
func GetNodeRoutingTable(ctx context.Context, ID NodeID) (RouteTable, error) {
fmt.Println("GetNodeRoutingTable")
fmt.Println("id: ", ID)
return RouteTable{}, nil
}

View File

@ -0,0 +1,4 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package kademlia

View File

@ -0,0 +1,57 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay"
)
// NewMockKademlia returns a newly intialized MockKademlia struct
func NewMockKademlia() *MockKademlia {
return &MockKademlia{}
}
// MockKademlia is a mock implementation of the DHT interface used solely for testing
type MockKademlia struct {
RoutingTable dht.RoutingTable
Nodes []*proto.Node
}
// GetNodes increments the GetNodesCalled field on MockKademlia
// returns the Nodes field on MockKademlia
func (k *MockKademlia) GetNodes(ctx context.Context, start string, limit int) ([]*proto.Node, error) {
return k.Nodes, nil
}
// GetRoutingTable increments the GetRoutingTableCalled field on MockKademlia
//
// returns the RoutingTable field on MockKademlia
func (k *MockKademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) {
return k.RoutingTable, nil
}
// Bootstrap increments the BootstrapCalled field on MockKademlia
func (k *MockKademlia) Bootstrap(ctx context.Context) error {
return nil
}
// Ping increments the PingCalled field on MockKademlia
func (k *MockKademlia) Ping(ctx context.Context, node proto.Node) (proto.Node, error) {
return node, nil
}
// FindNode increments the FindNodeCalled field on MockKademlia
//
// returns the local kademlia node
func (k *MockKademlia) FindNode(ctx context.Context, ID dht.NodeID) (proto.Node, error) {
return k.RoutingTable.Local(), nil
}
// Disconnect increments the DisconnectCalled field on MockKademlia
func (k *MockKademlia) Disconnect() error {
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/protos/overlay"
"storj.io/storj/storage"
@ -23,11 +24,11 @@ var ErrNodeNotFound = errs.Class("Node not found")
// Cache is used to store overlay data in Redis
type Cache struct {
DB storage.KeyValueStore
DHT kademlia.DHT
DHT dht.DHT
}
// NewRedisOverlayCache returns a pointer to a new Cache instance with an initalized connection to Redis.
func NewRedisOverlayCache(address, password string, db int, DHT kademlia.DHT) (*Cache, error) {
func NewRedisOverlayCache(address, password string, db int, DHT dht.DHT) (*Cache, error) {
rc, err := redis.NewClient(address, password, db)
if err != nil {
return nil, err
@ -40,7 +41,7 @@ func NewRedisOverlayCache(address, password string, db int, DHT kademlia.DHT) (*
}
// NewBoltOverlayCache returns a pointer to a new Cache instance with an initalized connection to a Bolt db.
func NewBoltOverlayCache(dbPath string, DHT kademlia.DHT) (*Cache, error) {
func NewBoltOverlayCache(dbPath string, DHT dht.DHT) (*Cache, error) {
bc, err := boltdb.NewClient(nil, dbPath, boltdb.OverlayBucket)
if err != nil {
return nil, err
@ -83,7 +84,7 @@ func (o *Cache) Bootstrap(ctx context.Context) error {
nodes, err := o.DHT.GetNodes(ctx, "0", 1280)
for _, v := range nodes {
found, err := o.DHT.FindNode(ctx, kademlia.NodeID(v.Id))
found, err := o.DHT.FindNode(ctx, kademlia.StringToNodeID(v.Id))
if err != nil {
fmt.Println("could not find node in network", err, v.Id)
}
@ -138,8 +139,7 @@ func (o *Cache) Walk(ctx context.Context) error {
}
for _, v := range nodes {
_, err := o.DHT.FindNode(ctx, kademlia.NodeID(v.Id))
if err != nil {
if _, err := o.DHT.FindNode(ctx, kademlia.StringToNodeID(v.Id)); err != nil {
fmt.Println("could not find node in network", err, v.Id)
}
}

View File

@ -0,0 +1,65 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"fmt"
"net"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"storj.io/storj/internal/test"
"storj.io/storj/pkg/kademlia"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
)
func TestFindStorageNodes(t *testing.T) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
id, err := kademlia.NewID()
assert.NoError(t, err)
id2, err := kademlia.NewID()
assert.NoError(t, err)
srv := NewMockServer(test.KvStore{id.String(): NewNodeAddressValue(t, "127.0.0.1:9090"), id2.String(): NewNodeAddressValue(t, "127.0.0.1:9090")})
assert.NotNil(t, srv)
go srv.Serve(lis)
defer srv.Stop()
address := lis.Addr().String()
c, err := NewClient(&address, grpc.WithInsecure())
assert.NoError(t, err)
r, err := c.FindStorageNodes(context.Background(), &proto.FindStorageNodesRequest{})
assert.NoError(t, err)
assert.NotNil(t, r)
assert.Len(t, r.Nodes, 2)
}
func TestOverlayLookup(t *testing.T) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
id, err := kademlia.NewID()
assert.NoError(t, err)
srv := NewMockServer(test.KvStore{id.String(): NewNodeAddressValue(t, "127.0.0.1:9090")})
go srv.Serve(lis)
defer srv.Stop()
address := lis.Addr().String()
c, err := NewClient(&address, grpc.WithInsecure())
assert.NoError(t, err)
r, err := c.Lookup(context.Background(), &proto.LookupRequest{NodeID: id.String()})
assert.NoError(t, err)
assert.NotNil(t, r)
}

View File

@ -5,16 +5,23 @@ package overlay
import (
"context"
"sync"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
"storj.io/storj/storage"
)
const (
maxNodes = 40
)
// Server implements our overlay RPC service
type Server struct {
kad *kademlia.Kademlia
dht dht.DHT
cache *Cache
logger *zap.Logger
metrics *monkit.Registry
@ -40,16 +47,49 @@ func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.L
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (*proto.FindStorageNodesResponse, error) {
// NB: call FilterNodeReputation from node_reputation package to find nodes for storage
// TODO(coyle): need to determine if we will pull the startID and Limit from the request or just use hardcoded data
// for now just using 40 for demos and empty string which will default the Id to the kademlia node doing the lookup
nodes, err := o.kad.GetNodes(ctx, "", 40)
keys, err := o.cache.DB.List()
if err != nil {
o.logger.Error("Error getting nodes", zap.Error(err))
o.logger.Error("Error listing nodes", zap.Error(err))
return nil, err
}
if len(keys) > maxNodes {
// TODO(coyle): determine if this is a set value or they user of the api will specify
keys = keys[:maxNodes]
}
nodes := o.getNodes(ctx, keys)
return &proto.FindStorageNodesResponse{
Nodes: nodes,
}, nil
}
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) []*proto.Node {
wg := &sync.WaitGroup{}
ch := make(chan *proto.Node, len(keys))
wg.Add(len(keys))
for _, v := range keys {
go func(ch chan *proto.Node, id string) {
defer wg.Done()
na, err := o.cache.Get(ctx, id)
if err != nil {
o.logger.Error("failed to get key from cache", zap.Error(err))
return
}
ch <- &proto.Node{Id: id, Address: na}
}(ch, v.String())
}
wg.Wait()
close(ch)
nodes := []*proto.Node{}
for node := range ch {
nodes = append(nodes, node)
}
return nodes
}

View File

@ -41,7 +41,7 @@ func init() {
func NewServer(k *kademlia.Kademlia, cache *Cache, l *zap.Logger, m *monkit.Registry) *grpc.Server {
grpcServer := grpc.NewServer()
proto.RegisterOverlayServer(grpcServer, &Server{
kad: k,
dht: k,
cache: cache,
logger: l,
metrics: m,
@ -76,9 +76,17 @@ func (s *Service) Process(ctx context.Context) error {
// 4. Boostrap Redis Cache
// TODO(coyle): Should add the ability to pass a configuration to change the bootstrap node
in := kademlia.GetIntroNode(bootstrapIP, bootstrapPort)
in, err := kademlia.GetIntroNode("", bootstrapIP, bootstrapPort)
if err != nil {
return err
}
kad, err := kademlia.NewKademlia([]proto.Node{in}, "0.0.0.0", localPort)
id, err := kademlia.NewID()
if err != nil {
return err
}
kad, err := kademlia.NewKademlia(id, []proto.Node{*in}, "0.0.0.0", localPort)
if err != nil {
s.logger.Error("Failed to instantiate new Kademlia", zap.Error(err))
return err

View File

@ -6,9 +6,7 @@ package overlay
import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"path/filepath"
"testing"
@ -17,12 +15,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/test"
"storj.io/storj/pkg/process"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
// naming proto to avoid confusion with this package
)
func newTestService(t *testing.T) Service {
@ -32,37 +29,6 @@ func newTestService(t *testing.T) Service {
}
}
func TestNewServer(t *testing.T) {
t.SkipNow()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
srv := NewServer(nil, nil, nil, nil)
assert.NotNil(t, srv)
go srv.Serve(lis)
srv.Stop()
}
func TestNewClient(t *testing.T) {
// a := "35.232.202.229:8080"
// c, err := NewClient(&a, grpc.WithInsecure())
t.SkipNow()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
srv := NewServer(nil, nil, nil, nil)
go srv.Serve(lis)
defer srv.Stop()
address := lis.Addr().String()
c, err := NewClient(&address, grpc.WithInsecure())
assert.NoError(t, err)
r, err := c.Lookup(context.Background(), &proto.LookupRequest{})
assert.NoError(t, err)
assert.NotNil(t, r)
}
func TestProcess_redis(t *testing.T) {
flag.Set("localPort", "0")
done := test.EnsureRedis(t)

52
pkg/overlay/test_utils.go Normal file
View File

@ -0,0 +1,52 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"testing"
protob "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/grpc"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/test"
"storj.io/storj/pkg/kademlia"
proto "storj.io/storj/protos/overlay"
"storj.io/storj/storage"
)
// NewMockServer provides a mock grpc server for testing
func NewMockServer(kv test.KvStore) *grpc.Server {
grpcServer := grpc.NewServer()
registry := monkit.Default
k := kademlia.NewMockKademlia()
c := &Cache{
DB: test.NewMockKeyValueStore(kv),
DHT: k,
}
s := Server{
dht: k,
cache: c,
logger: zap.NewNop(),
metrics: registry,
}
proto.RegisterOverlayServer(grpcServer, &s)
return grpcServer
}
// NewNodeAddressValue provides a convient way to create a storage.Value for testing purposes
func NewNodeAddressValue(t *testing.T, address string) storage.Value {
na := &proto.NodeAddress{Transport: proto.NodeTransport_TCP, Address: address}
d, err := protob.Marshal(na)
assert.NoError(t, err)
return d
}