Simplify peer discovery implementation (#765)

This commit is contained in:
Egon Elbre 2018-12-05 16:32:37 +02:00 committed by GitHub
parent b2a8a10b16
commit c4033b15af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 33 deletions

View File

@ -192,7 +192,7 @@ func (k *Kademlia) lookup(ctx context.Context, target storj.NodeID, opts discove
} }
lookup := newPeerDiscovery(nodes, k.nodeClient, target, opts) lookup := newPeerDiscovery(nodes, k.nodeClient, target, opts)
err = lookup.Run(ctx) _, err = lookup.Run(ctx)
if err != nil { if err != nil {
zap.L().Warn("lookup failed", zap.Error(err)) zap.L().Warn("lookup failed", zap.Error(err))
@ -229,21 +229,14 @@ func (k *Kademlia) FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, erro
concurrency: k.alpha, retries: defaultRetries, bootstrap: false, bootstrapNodes: k.bootstrapNodes, concurrency: k.alpha, retries: defaultRetries, bootstrap: false, bootstrapNodes: k.bootstrapNodes,
}) })
err = lookup.Run(ctx) target, err := lookup.Run(ctx)
if err != nil { if err != nil {
return pb.Node{}, err return pb.Node{}, err
} }
if target == nil {
select { return pb.Node{}, NodeNotFound.New("")
case foundOne := <-lookup.foundOne:
if foundOne == nil {
return pb.Node{}, NodeNotFound.New("")
}
return *foundOne, nil
default: // this is to keep it from blocking
} }
return *target, nil
return pb.Node{}, nil
} }
// ListenAndServe connects the kademlia node to the network and listens for incoming requests // ListenAndServe connects the kademlia node to the network and listens for incoming requests

View File

@ -17,10 +17,9 @@ import (
) )
type peerDiscovery struct { type peerDiscovery struct {
client node.Client client node.Client
target storj.NodeID target storj.NodeID
opts discoveryOptions opts discoveryOptions
foundOne chan *pb.Node
cond sync.Cond cond sync.Cond
queue discoveryQueue queue discoveryQueue
@ -30,33 +29,32 @@ type peerDiscovery struct {
var ErrMaxRetries = errs.Class("max retries exceeded for id:") var ErrMaxRetries = errs.Class("max retries exceeded for id:")
func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery { func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery {
oneChan := make(chan *pb.Node, 1)
discovery := &peerDiscovery{ discovery := &peerDiscovery{
client: client, client: client,
target: target, target: target,
opts: opts, opts: opts,
foundOne: oneChan, cond: sync.Cond{L: &sync.Mutex{}},
cond: sync.Cond{L: &sync.Mutex{}}, queue: *newDiscoveryQueue(opts.concurrency),
queue: *newDiscoveryQueue(opts.concurrency),
} }
discovery.queue.Insert(target, nodes...) discovery.queue.Insert(target, nodes...)
return discovery return discovery
} }
func (lookup *peerDiscovery) Run(ctx context.Context) error { func (lookup *peerDiscovery) Run(ctx context.Context) (target *pb.Node, err error) {
if lookup.queue.Len() == 0 { if lookup.queue.Len() == 0 {
return nil // TODO: should we return an error here? return nil, nil // TODO: should we return an error here?
} }
wg := sync.WaitGroup{}
defer close(lookup.foundOne)
// protected by `lookup.cond.L` // protected by `lookup.cond.L`
working := 0 working := 0
allDone := false allDone := false
target = nil
wg := sync.WaitGroup{}
wg.Add(lookup.opts.concurrency) wg.Add(lookup.opts.concurrency)
defer wg.Wait()
for i := 0; i < lookup.opts.concurrency; i++ { for i := 0; i < lookup.opts.concurrency; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -73,9 +71,9 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
next = lookup.queue.Closest() next = lookup.queue.Closest()
if !lookup.opts.bootstrap && next != nil && next.Id.String() == lookup.target.String() { if !lookup.opts.bootstrap && next != nil && next.Id == lookup.target {
lookup.foundOne <- next
allDone = true allDone = true
target = next
break // closest node is the target and is already in routing table (i.e. no lookup required) break // closest node is the target and is already in routing table (i.e. no lookup required)
} }
@ -98,7 +96,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
if !ok { if !ok {
zap.S().Errorf( zap.S().Errorf(
"Error occurred during lookup of %s :: %s :: error = %s", "Error occurred during lookup of %s :: %s :: error = %s",
lookup.target.String(), lookup.target,
ErrMaxRetries.New("%s", next.Id), ErrMaxRetries.New("%s", next.Id),
err.Error(), err.Error(),
) )
@ -116,8 +114,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
}() }()
} }
wg.Wait() return target, ctx.Err()
return ctx.Err()
} }
func isDone(ctx context.Context) bool { func isDone(ctx context.Context) bool {