Use sync.Cond with kademlia workers (#512)

* wip - have to take a break; crying baby

* linter fixes

* bugfix

* responding to review feedback

* linter fixes?

* linter fixes

* feedback fixes

* feedback fixes

* linter fixes

* linter fixes

* linter fixes
This commit is contained in:
Bryan White 2018-11-01 18:03:46 +01:00 committed by GitHub
parent 856c3a779f
commit 57572cdeed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 197 additions and 125 deletions

View File

@ -24,21 +24,22 @@ import (
"storj.io/storj/storage/boltdb"
)
// NodeErr is the class for all errors pertaining to node operations
var NodeErr = errs.Class("node error")
var (
// NodeErr is the class for all errors pertaining to node operations
NodeErr = errs.Class("node error")
// BootstrapErr is the class for all errors pertaining to bootstrapping a node
BootstrapErr = errs.Class("bootstrap node error")
// NodeNotFound is returned when a lookup can not produce the requested node
NodeNotFound = NodeErr.New("node not found")
// TODO: shouldn't default to TCP but not sure what to do yet
defaultTransport = pb.NodeTransport_TCP_TLS_GRPC
defaultRetries = 3
)
// BootstrapErr is the class for all errors pertaining to bootstrapping a node
var BootstrapErr = errs.Class("bootstrap node error")
//TODO: shouldn't default to TCP but not sure what to do yet
var defaultTransport = pb.NodeTransport_TCP_TLS_GRPC
// NodeNotFound is returned when a lookup can not produce the requested node
var NodeNotFound = NodeErr.New("node not found")
type lookupOpts struct {
amount int
bootstrap bool
type discoveryOptions struct {
concurrency int
retries int
bootstrap bool
}
// Kademlia is an implementation of kademlia adhering to the DHT interface.
@ -167,10 +168,12 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error {
return BootstrapErr.New("no bootstrap nodes provided")
}
return k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), lookupOpts{amount: 5, bootstrap: true})
return k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), discoveryOptions{
concurrency: k.alpha, retries: defaultRetries, bootstrap: true,
})
}
func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts lookupOpts) error {
func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts discoveryOptions) error {
kb := k.routingTable.K()
// look in routing table for targetID
nodes, err := k.routingTable.FindNear(target, kb)
@ -178,7 +181,7 @@ func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts lookupOpt
return err
}
lookup := newSequentialLookup(k.routingTable, nodes, k.nodeClient, target, opts.amount, opts.bootstrap)
lookup := newPeerDiscovery(nodes, k.nodeClient, target, opts)
err = lookup.Run(ctx)
if err != nil {
zap.L().Warn("lookup failed", zap.Error(err))
@ -204,7 +207,7 @@ func (k *Kademlia) Ping(ctx context.Context, node pb.Node) (pb.Node, error) {
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
// begins searching the network for the NodeID. Returns and error if node was not found
func (k *Kademlia) FindNode(ctx context.Context, ID dht.NodeID) (pb.Node, error) {
//TODO(coyle)
// TODO(coyle)
return pb.Node{}, NodeErr.New("TODO FindNode")
}

View File

@ -80,7 +80,7 @@ func TestNewKademlia(t *testing.T) {
}
func TestLookup(t *testing.T) {
func TestPeerDiscovery(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
addr := lis.Addr().String()
@ -100,8 +100,8 @@ func TestLookup(t *testing.T) {
assert.NoError(t, err)
// create two new unique identities
id := node.ID(fid.ID)
id2 := node.ID(fid2.ID)
id := fid.ID
id2 := fid2.ID
assert.NotEqual(t, id, id2)
kid := dht.NodeID(fid.ID)
@ -116,7 +116,7 @@ func TestLookup(t *testing.T) {
cases := []struct {
target dht.NodeID
opts lookupOpts
opts discoveryOptions
expected *pb.Node
expectedErr error
}{
@ -128,7 +128,7 @@ func TestLookup(t *testing.T) {
mns.returnValue = []*pb.Node{&pb.Node{Id: id.String(), Address: &pb.NodeAddress{Address: addr}}}
return &nid
}(),
opts: lookupOpts{amount: 5},
opts: discoveryOptions{concurrency: 3, bootstrap: true, retries: 1},
expected: &pb.Node{},
expectedErr: nil,
},
@ -138,7 +138,7 @@ func TestLookup(t *testing.T) {
n := node.ID(id.ID)
return &n
}(),
opts: lookupOpts{amount: 5},
opts: discoveryOptions{concurrency: 3, bootstrap: true, retries: 1},
expected: nil,
expectedErr: nil,
},

View File

@ -1,92 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"bytes"
"context"
"log"
"time"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
)
type sequentialLookup struct {
contacted map[string]bool
queue *XorQueue
slowestResponse time.Duration
client node.Client
target dht.NodeID
limit int
bootstrap bool
}
func newSequentialLookup(rt *RoutingTable, nodes []*pb.Node, client node.Client, target dht.NodeID, limit int, bootstrap bool) *sequentialLookup {
queue := NewXorQueue(limit)
queue.Insert(target, nodes)
return &sequentialLookup{
contacted: map[string]bool{},
queue: queue,
slowestResponse: 0,
client: client,
target: target,
limit: limit,
bootstrap: bootstrap,
}
}
func (lookup *sequentialLookup) Run(ctx context.Context) error {
for lookup.queue.Len() > 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
next, priority := lookup.queue.Closest()
if !lookup.bootstrap && bytes.Equal(priority.Bytes(), make([]byte, len(priority.Bytes()))) {
return nil // found the result
}
uncontactedNeighbors := []*pb.Node{}
neighbors := lookup.FetchNeighbors(ctx, next)
for _, neighbor := range neighbors {
if !lookup.contacted[neighbor.GetId()] {
uncontactedNeighbors = append(uncontactedNeighbors, neighbor)
}
}
lookup.queue.Insert(lookup.target, uncontactedNeighbors)
for lookup.queue.Len() > lookup.limit {
lookup.queue.Closest()
}
}
return nil
}
func (lookup *sequentialLookup) FetchNeighbors(ctx context.Context, node *pb.Node) []*pb.Node {
if node.GetAddress() == nil {
return nil
}
lookup.contacted[node.GetId()] = true
start := time.Now()
neighbors, err := lookup.client.Lookup(ctx, *node, pb.Node{Id: lookup.target.String()})
if err != nil {
// TODO(coyle): I think we might want to do another look up on this node or update something
// but for now let's just log and ignore.
log.Printf("Error occurred during lookup for %s on %s :: error = %s", lookup.target.String(), node.GetId(), err.Error())
return []*pb.Node{}
}
latency := time.Since(start)
if latency > lookup.slowestResponse {
lookup.slowestResponse = latency
}
return neighbors
}

View File

@ -0,0 +1,118 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
)
type peerDiscovery struct {
client node.Client
target dht.NodeID
opts discoveryOptions
cond sync.Cond
queue *XorQueue
}
// ErrMaxRetries is used when a lookup has been retried the max number of times
var ErrMaxRetries = errs.Class("max retries exceeded for id:")
func newPeerDiscovery(nodes []*pb.Node, client node.Client, target dht.NodeID, opts discoveryOptions) *peerDiscovery {
queue := NewXorQueue(opts.concurrency)
queue.Insert(target, nodes)
return &peerDiscovery{
client: client,
target: target,
opts: opts,
cond: sync.Cond{L: &sync.Mutex{}},
queue: queue,
}
}
func (lookup *peerDiscovery) Run(ctx context.Context) error {
wg := sync.WaitGroup{}
// protected by `lookup.cond.L`
working := 0
allDone := false
wg.Add(lookup.opts.concurrency)
for i := 0; i < lookup.opts.concurrency; i++ {
go func() {
defer wg.Done()
for {
var (
next *pb.Node
)
lookup.cond.L.Lock()
for {
// everything is done, this routine can return
if allDone {
lookup.cond.L.Unlock()
return
}
next, _ = lookup.queue.Closest()
if !lookup.opts.bootstrap && next.GetId() == lookup.target.String() {
allDone = true
break // closest node is the target and is already in routing table (i.e. no lookup required)
}
if next != nil {
working++
break
}
// no work, wait until some other routine inserts into the queue
lookup.cond.Wait()
}
lookup.cond.L.Unlock()
neighbors, err := lookup.client.Lookup(ctx, *next, pb.Node{Id: lookup.target.String()})
if err != nil {
ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries)
if !ok {
zap.S().Errorf(
"Error occurred during lookup of %s :: %s :: error = %s",
lookup.target.String(),
ErrMaxRetries.New("%s", next.GetId()),
err.Error(),
)
}
}
lookup.queue.Insert(lookup.target, neighbors)
lookup.cond.L.Lock()
working--
allDone = allDone || isDone(ctx) || working == 0 && lookup.queue.Len() == 0
lookup.cond.L.Unlock()
lookup.cond.Broadcast()
}
}()
}
wg.Wait()
return ctx.Err()
}
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

View File

@ -6,33 +6,73 @@ package kademlia
import (
"container/heap"
"math/big"
"sync"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/pb"
)
//XorQueue is a priority queue where the priority is key XOR distance
// XorQueue is a priority queue where the priority is key XOR distance
type XorQueue struct {
maxLen int
items items
mu sync.Mutex
added map[string]int
items items
}
//NewXorQueue returns a items with priority based on XOR from targetBytes
// NewXorQueue returns a items with priority based on XOR from targetBytes
func NewXorQueue(size int) *XorQueue {
return &XorQueue{items: make(items, 0, size), maxLen: size}
return &XorQueue{
items: make(items, 0, size),
added: make(map[string]int),
maxLen: size,
}
}
//Insert adds Node onto the queue
// Insert adds Nodes onto the queue
func (x *XorQueue) Insert(target dht.NodeID, nodes []*pb.Node) {
x.mu.Lock()
defer x.mu.Unlock()
unique := nodes[:0]
for _, node := range nodes {
nodeID := node.GetId()
if _, added := x.added[nodeID]; !added {
x.added[nodeID]++
unique = append(unique, node)
}
}
x.insert(target, unique)
}
// Reinsert adds a Nodes onto the queue if it's been added >= limit times previously
func (x *XorQueue) Reinsert(target dht.NodeID, node *pb.Node, limit int) bool {
x.mu.Lock()
defer x.mu.Unlock()
nodeID := node.GetId()
if x.added[nodeID] >= limit {
return false
}
x.added[nodeID]++
x.insert(target, []*pb.Node{node})
return true
}
// insert must hold lock while adding
func (x *XorQueue) insert(target dht.NodeID, nodes []*pb.Node) {
targetBytes := new(big.Int).SetBytes(target.Bytes())
//insert new nodes
// insert new nodes
for _, node := range nodes {
heap.Push(&x.items, &item{
value: node,
priority: new(big.Int).Xor(targetBytes, new(big.Int).SetBytes([]byte(node.GetId()))),
})
}
//resize down if we grew too big
// resize down if we grew too big
if x.items.Len() > x.maxLen {
olditems := x.items
x.items = items{}
@ -44,8 +84,11 @@ func (x *XorQueue) Insert(target dht.NodeID, nodes []*pb.Node) {
}
}
//Closest removed the closest priority node from the queue
// Closest removes the closest priority node from the queue
func (x *XorQueue) Closest() (*pb.Node, big.Int) {
x.mu.Lock()
defer x.mu.Unlock()
if x.Len() == 0 {
return nil, big.Int{}
}
@ -53,7 +96,7 @@ func (x *XorQueue) Closest() (*pb.Node, big.Int) {
return item.value, *item.priority
}
//Len returns the number of items in the queue
// Len returns the number of items in the queue
func (x *XorQueue) Len() int {
return x.items.Len()
}