diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index 53174880b..148f8fa52 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -192,7 +192,7 @@ func (k *Kademlia) lookup(ctx context.Context, target storj.NodeID, opts discove } lookup := newPeerDiscovery(nodes, k.nodeClient, target, opts) - err = lookup.Run(ctx) + _, err = lookup.Run(ctx) if err != nil { zap.L().Warn("lookup failed", zap.Error(err)) @@ -229,21 +229,14 @@ func (k *Kademlia) FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, erro concurrency: k.alpha, retries: defaultRetries, bootstrap: false, bootstrapNodes: k.bootstrapNodes, }) - err = lookup.Run(ctx) + target, err := lookup.Run(ctx) if err != nil { return pb.Node{}, err } - - select { - case foundOne := <-lookup.foundOne: - if foundOne == nil { - return pb.Node{}, NodeNotFound.New("") - } - return *foundOne, nil - default: // this is to keep it from blocking + if target == nil { + return pb.Node{}, NodeNotFound.New("") } - - return pb.Node{}, nil + return *target, nil } // ListenAndServe connects the kademlia node to the network and listens for incoming requests diff --git a/pkg/kademlia/peer_discovery.go b/pkg/kademlia/peer_discovery.go index 22ad6dd1e..f4ae75f09 100644 --- a/pkg/kademlia/peer_discovery.go +++ b/pkg/kademlia/peer_discovery.go @@ -17,10 +17,9 @@ import ( ) type peerDiscovery struct { - client node.Client - target storj.NodeID - opts discoveryOptions - foundOne chan *pb.Node + client node.Client + target storj.NodeID + opts discoveryOptions cond sync.Cond queue discoveryQueue @@ -30,33 +29,32 @@ type peerDiscovery struct { var ErrMaxRetries = errs.Class("max retries exceeded for id:") func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery { - oneChan := make(chan *pb.Node, 1) discovery := &peerDiscovery{ - client: client, - target: target, - opts: opts, - foundOne: oneChan, - cond: sync.Cond{L: &sync.Mutex{}}, - queue: *newDiscoveryQueue(opts.concurrency), + client: client, + target: target, + opts: opts, + cond: sync.Cond{L: &sync.Mutex{}}, + queue: *newDiscoveryQueue(opts.concurrency), } discovery.queue.Insert(target, nodes...) return discovery } -func (lookup *peerDiscovery) Run(ctx context.Context) error { +func (lookup *peerDiscovery) Run(ctx context.Context) (target *pb.Node, err error) { if lookup.queue.Len() == 0 { - return nil // TODO: should we return an error here? + return nil, nil // TODO: should we return an error here? } - wg := sync.WaitGroup{} - defer close(lookup.foundOne) - // protected by `lookup.cond.L` working := 0 allDone := false + target = nil + wg := sync.WaitGroup{} wg.Add(lookup.opts.concurrency) + defer wg.Wait() + for i := 0; i < lookup.opts.concurrency; i++ { go func() { defer wg.Done() @@ -73,9 +71,9 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { next = lookup.queue.Closest() - if !lookup.opts.bootstrap && next != nil && next.Id.String() == lookup.target.String() { - lookup.foundOne <- next + if !lookup.opts.bootstrap && next != nil && next.Id == lookup.target { allDone = true + target = next break // closest node is the target and is already in routing table (i.e. no lookup required) } @@ -98,7 +96,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { if !ok { zap.S().Errorf( "Error occurred during lookup of %s :: %s :: error = %s", - lookup.target.String(), + lookup.target, ErrMaxRetries.New("%s", next.Id), err.Error(), ) @@ -116,8 +114,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { }() } - wg.Wait() - return ctx.Err() + return target, ctx.Err() } func isDone(ctx context.Context) bool {