storj/pkg/kademlia/peer_discovery.go
Bryan White 2a0c4e60d2
preparing for use of customtype gogo extension with NodeID type (#693)
* preparing for use of `customtype` gogo extension with `NodeID` type

* review changes

* preparing for use of `customtype` gogo extension with `NodeID` type

* review changes

* wip

* tests passing

* wip fixing tests

* more wip test fixing

* remove NodeIDList from proto files

* linter fixes

* linter fixes

* linter/review fixes

* more freaking linter fixes

* omg just kill me - linterrrrrrrr

* travis linter, i will muder you and your family in your sleep

* goimports everything - burn in hell travis

* goimports update

* go mod tidy
2018-11-29 19:39:27 +01:00

120 lines
2.5 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
type peerDiscovery struct {
client node.Client
target storj.NodeID
opts discoveryOptions
cond sync.Cond
queue *XorQueue
}
// ErrMaxRetries is used when a lookup has been retried the max number of times
var ErrMaxRetries = errs.Class("max retries exceeded for id:")
func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery {
queue := NewXorQueue(opts.concurrency)
queue.Insert(target, nodes)
return &peerDiscovery{
client: client,
target: target,
opts: opts,
cond: sync.Cond{L: &sync.Mutex{}},
queue: queue,
}
}
func (lookup *peerDiscovery) Run(ctx context.Context) error {
wg := sync.WaitGroup{}
// protected by `lookup.cond.L`
working := 0
allDone := false
wg.Add(lookup.opts.concurrency)
for i := 0; i < lookup.opts.concurrency; i++ {
go func() {
defer wg.Done()
for {
var (
next *pb.Node
)
lookup.cond.L.Lock()
for {
// everything is done, this routine can return
if allDone {
lookup.cond.L.Unlock()
return
}
next, _ = lookup.queue.Closest()
if !lookup.opts.bootstrap && next.Id == lookup.target {
allDone = true
break // closest node is the target and is already in routing table (i.e. no lookup required)
}
if next != nil {
working++
break
}
// no work, wait until some other routine inserts into the queue
lookup.cond.Wait()
}
lookup.cond.L.Unlock()
neighbors, err := lookup.client.Lookup(ctx, *next, pb.Node{Id: lookup.target})
if err != nil {
ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries)
if !ok {
zap.S().Errorf(
"Error occurred during lookup of %s :: %s :: error = %s",
lookup.target.String(),
ErrMaxRetries.New("%s", next.Id),
err.Error(),
)
}
}
lookup.queue.Insert(lookup.target, neighbors)
lookup.cond.L.Lock()
working--
allDone = allDone || isDone(ctx) || working == 0 && lookup.queue.Len() == 0
lookup.cond.L.Unlock()
lookup.cond.Broadcast()
}
}()
}
wg.Wait()
return ctx.Err()
}
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}