kademlia get nodes (#444)

* get nodes

* restrictions with getnodes

* increases limit

* iterates with restrictions

* testgetnodes

* Tests meetsRequirements

* update id creation
This commit is contained in:
Jennifer Li Johnson 2018-10-16 11:22:31 -04:00 committed by GitHub
parent 6ffded3186
commit dd525eb978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 271 additions and 4 deletions

View File

@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"github.com/golang/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -19,6 +20,7 @@ import (
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
"storj.io/storj/storage"
)
// NodeErr is the class for all errors pertaining to node operations
@ -106,14 +108,43 @@ func (k *Kademlia) Disconnect() error {
// GetNodes returns all nodes from a starting node up to a maximum limit
// stored in the local routing table limiting the result by the specified restrictions
func (k *Kademlia) GetNodes(ctx context.Context, start string, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) {
// TODO(coyle)
return []*pb.Node{}, errors.New("TODO GetNodes")
nodes := []*pb.Node{}
iteratorMethod := func(it storage.Iterator) error {
var item storage.ListItem
maxLimit := storage.LookupLimit
for ; maxLimit > 0 && it.Next(&item); maxLimit-- {
id := string(item.Key)
node := &pb.Node{}
err := proto.Unmarshal(item.Value, node)
if err != nil {
return Error.Wrap(err)
}
node.Id = id
if meetsRestrictions(restrictions, *node) {
nodes = append(nodes, node)
}
if len(nodes) == limit {
return nil
}
}
return nil
}
err := k.routingTable.iterate(
storage.IterateOptions{
First: storage.Key(start),
Recurse: true,
},
iteratorMethod,
)
if err != nil {
return []*pb.Node{}, Error.Wrap(err)
}
return nodes, nil
}
// GetRoutingTable provides the routing table for the Kademlia DHT
func (k *Kademlia) GetRoutingTable(ctx context.Context) (dht.RoutingTable, error) {
return k.routingTable, nil
}
// Bootstrap contacts one of a set of pre defined trusted nodes on the network and
@ -213,7 +244,7 @@ func Restrict(r pb.Restriction, n []*pb.Node) []*pb.Node {
switch op {
case pb.Restriction_EQ:
if comp != val {
if comp == val {
results = append(results, v)
continue
}
@ -244,3 +275,41 @@ func Restrict(r pb.Restriction, n []*pb.Node) []*pb.Node {
return results
}
func meetsRestrictions(rs []pb.Restriction, n pb.Node) bool {
for _, r := range rs {
oper := r.GetOperand()
op := r.GetOperator()
val := r.GetValue()
var comp int64
switch oper {
case pb.Restriction_freeBandwidth:
comp = n.GetRestrictions().GetFreeBandwidth()
case pb.Restriction_freeDisk:
comp = n.GetRestrictions().GetFreeDisk()
}
switch op {
case pb.Restriction_EQ:
if comp != val {
return false
}
case pb.Restriction_LT:
if comp >= val {
return false
}
case pb.Restriction_LTE:
if comp > val {
return false
}
case pb.Restriction_GT:
if comp <= val {
return false
}
case pb.Restriction_GTE:
if comp < val {
return false
}
}
}
return true
}

View File

@ -10,6 +10,7 @@ import (
"os"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
@ -203,3 +204,196 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server) {
return k, grpcServer
}
func TestGetNodes(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
kc := kadconfig()
srv, _ := newTestServer([]*pb.Node{&pb.Node{Id: "foo"}})
go func() { _ = srv.Serve(lis) }()
defer srv.Stop()
// make new identity
fid, err := newTestIdentity()
assert.NoError(t, err)
fid2, err := newTestIdentity()
assert.NoError(t, err)
// create two new unique identities
id := node.ID(fid.ID)
id2 := node.ID(fid2.ID)
assert.NotEqual(t, id, id2)
kid := dht.NodeID(fid.ID)
k, err := NewKademlia(kid, []pb.Node{pb.Node{Id: id2.String(), Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), fid, "db", kc)
assert.NoError(t, err)
// add nodes
ids := []string{"A", "B", "C", "D"}
bw := []int64{1, 2, 3, 4}
disk := []int64{4, 3, 2, 1}
nodes := []*pb.Node{}
for i, v := range ids {
n := &pb.Node{
Id: v,
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: bw[i],
FreeDisk: disk[i],
},
}
nodes = append(nodes, n)
err = k.routingTable.ConnectionSuccess(n)
assert.NoError(t, err)
}
cases := []struct {
testID string
start string
limit int
restrictions []pb.Restriction
expected []*pb.Node
}{
{testID: "one",
start: "B",
limit: 2,
restrictions: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_freeBandwidth,
Value: int64(2),
},
},
expected: nodes[2:],
},
{testID: "two",
start: "A",
limit: 3,
restrictions: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_freeBandwidth,
Value: int64(2),
},
pb.Restriction{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_freeDisk,
Value: int64(2),
},
},
expected: nodes[3:],
},
{testID: "three",
start: "A",
limit: 4,
restrictions: []pb.Restriction{},
expected: nodes,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
ns, err := k.GetNodes(context.Background(), c.start, c.limit, c.restrictions...)
assert.NoError(t, err)
assert.Equal(t, len(c.expected), len(ns))
for i, n := range ns {
assert.True(t, proto.Equal(c.expected[i], n))
}
})
}
}
func TestMeetsRestrictions(t *testing.T) {
cases := []struct {
testID string
r []pb.Restriction
n pb.Node
expect bool
}{
{testID: "pass one",
r: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_EQ,
Operand: pb.Restriction_freeBandwidth,
Value: int64(1),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
},
},
expect: true,
},
{testID: "pass multiple",
r: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_LTE,
Operand: pb.Restriction_freeBandwidth,
Value: int64(2),
},
pb.Restriction{
Operator: pb.Restriction_GTE,
Operand: pb.Restriction_freeDisk,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
FreeDisk: int64(3),
},
},
expect: true,
},
{testID: "fail one",
r: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_freeBandwidth,
Value: int64(2),
},
pb.Restriction{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_freeDisk,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(3),
},
},
expect: false,
},
{testID: "fail multiple",
r: []pb.Restriction{
pb.Restriction{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_freeBandwidth,
Value: int64(2),
},
pb.Restriction{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_freeDisk,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(2),
},
},
expect: false,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
result := meetsRestrictions(c.r, c.n)
assert.Equal(t, c.expect, result)
})
}
}

View File

@ -229,3 +229,7 @@ func (rt *RoutingTable) GetBucketTimestamp(id string, bucket dht.Bucket) (time.T
return time.Unix(0, timestamp).UTC(), nil
}
func (rt *RoutingTable) iterate(opts storage.IterateOptions, f func(it storage.Iterator) error) error {
return rt.nodeBucketDB.Iterate(opts, f)
}