c29f033e7d
* move kademlia.Dialer into kademliaclient package
253 lines
5.7 KiB
Go
253 lines
5.7 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package kademlia
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/storj/pkg/kademlia/kademliaclient"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/storj"
|
|
)
|
|
|
|
type peerDiscovery struct {
|
|
log *zap.Logger
|
|
|
|
dialer *kademliaclient.Dialer
|
|
self *pb.Node
|
|
target storj.NodeID
|
|
k int
|
|
concurrency int
|
|
|
|
cond sync.Cond
|
|
queue discoveryQueue
|
|
}
|
|
|
|
// ErrMaxRetries is used when a lookup has been retried the max number of times
|
|
var ErrMaxRetries = errs.Class("max retries exceeded for id:")
|
|
|
|
func newPeerDiscovery(log *zap.Logger, dialer *kademliaclient.Dialer, target storj.NodeID, startingNodes []*pb.Node, k, alpha int, self *pb.Node) *peerDiscovery {
|
|
discovery := &peerDiscovery{
|
|
log: log,
|
|
dialer: dialer,
|
|
self: self,
|
|
target: target,
|
|
k: k,
|
|
concurrency: alpha,
|
|
cond: sync.Cond{L: &sync.Mutex{}},
|
|
queue: *newDiscoveryQueue(target, k),
|
|
}
|
|
discovery.queue.Insert(startingNodes...)
|
|
return discovery
|
|
}
|
|
|
|
func (lookup *peerDiscovery) Run(ctx context.Context) (_ []*pb.Node, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if lookup.queue.Unqueried() == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// protected by `lookup.cond.L`
|
|
working := 0
|
|
allDone := false
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(lookup.concurrency)
|
|
|
|
for i := 0; i < lookup.concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
var next *pb.Node
|
|
|
|
lookup.cond.L.Lock()
|
|
for {
|
|
// everything is done, this routine can return
|
|
if allDone {
|
|
lookup.cond.L.Unlock()
|
|
return
|
|
}
|
|
|
|
next = lookup.queue.ClosestUnqueried()
|
|
|
|
if next != nil {
|
|
working++
|
|
break
|
|
}
|
|
// no work, wait until some other routine inserts into the queue
|
|
lookup.cond.Wait()
|
|
}
|
|
lookup.cond.L.Unlock()
|
|
|
|
neighbors, err := lookup.dialer.Lookup(ctx, lookup.self, *next, lookup.target, lookup.k)
|
|
if err != nil {
|
|
lookup.queue.QueryFailure(next)
|
|
if !isDone(ctx) {
|
|
lookup.log.Debug("connecting to node failed",
|
|
zap.Any("target", lookup.target),
|
|
zap.Any("dial-node", next.Id),
|
|
zap.Any("dial-address", next.Address.Address),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
} else {
|
|
lookup.queue.QuerySuccess(next, neighbors...)
|
|
}
|
|
|
|
lookup.cond.L.Lock()
|
|
working--
|
|
allDone = allDone || isDone(ctx) || (working == 0 && lookup.queue.Unqueried() == 0)
|
|
lookup.cond.L.Unlock()
|
|
lookup.cond.Broadcast()
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return lookup.queue.ClosestQueried(), ctx.Err()
|
|
}
|
|
|
|
func isDone(ctx context.Context) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
type queueState int
|
|
|
|
const (
|
|
stateUnqueried queueState = iota
|
|
stateQuerying
|
|
stateSuccess
|
|
stateFailure
|
|
)
|
|
|
|
// discoveryQueue is a limited priority queue for nodes with xor distance
|
|
type discoveryQueue struct {
|
|
target storj.NodeID
|
|
maxLen int
|
|
mu sync.Mutex
|
|
state map[storj.NodeID]queueState
|
|
items []queueItem
|
|
}
|
|
|
|
// queueItem is node with a priority
|
|
type queueItem struct {
|
|
node *pb.Node
|
|
priority storj.NodeID
|
|
}
|
|
|
|
// newDiscoveryQueue returns a items with priority based on XOR from targetBytes
|
|
func newDiscoveryQueue(target storj.NodeID, size int) *discoveryQueue {
|
|
return &discoveryQueue{
|
|
target: target,
|
|
state: make(map[storj.NodeID]queueState),
|
|
maxLen: size,
|
|
}
|
|
}
|
|
|
|
// Insert adds nodes into the queue.
|
|
func (queue *discoveryQueue) Insert(nodes ...*pb.Node) {
|
|
queue.mu.Lock()
|
|
defer queue.mu.Unlock()
|
|
queue.insert(nodes...)
|
|
}
|
|
|
|
// insert requires the mutex to be locked
|
|
func (queue *discoveryQueue) insert(nodes ...*pb.Node) {
|
|
for _, node := range nodes {
|
|
// TODO: empty node ids should be semantically different from the
|
|
// technically valid node id that is all zeros
|
|
if node.Id == (storj.NodeID{}) {
|
|
continue
|
|
}
|
|
if _, added := queue.state[node.Id]; added {
|
|
continue
|
|
}
|
|
queue.state[node.Id] = stateUnqueried
|
|
|
|
queue.items = append(queue.items, queueItem{
|
|
node: node,
|
|
priority: xorNodeID(queue.target, node.Id),
|
|
})
|
|
}
|
|
|
|
sort.Slice(queue.items, func(i, k int) bool {
|
|
return queue.items[i].priority.Less(queue.items[k].priority)
|
|
})
|
|
|
|
if len(queue.items) > queue.maxLen {
|
|
queue.items = queue.items[:queue.maxLen]
|
|
}
|
|
}
|
|
|
|
// ClosestUnqueried returns the closest unqueried item in the queue
|
|
func (queue *discoveryQueue) ClosestUnqueried() *pb.Node {
|
|
queue.mu.Lock()
|
|
defer queue.mu.Unlock()
|
|
|
|
for _, item := range queue.items {
|
|
if queue.state[item.node.Id] == stateUnqueried {
|
|
queue.state[item.node.Id] = stateQuerying
|
|
return item.node
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ClosestQueried returns the closest queried items in the queue
|
|
func (queue *discoveryQueue) ClosestQueried() []*pb.Node {
|
|
queue.mu.Lock()
|
|
defer queue.mu.Unlock()
|
|
|
|
rv := make([]*pb.Node, 0, len(queue.items))
|
|
for _, item := range queue.items {
|
|
if queue.state[item.node.Id] == stateSuccess {
|
|
rv = append(rv, item.node)
|
|
}
|
|
}
|
|
|
|
return rv
|
|
}
|
|
|
|
// QuerySuccess marks the node as successfully queried, and adds the results to the queue
|
|
// QuerySuccess marks nodes with a zero node ID as ignored, and ignores incoming
|
|
// nodes with a zero id.
|
|
func (queue *discoveryQueue) QuerySuccess(node *pb.Node, nodes ...*pb.Node) {
|
|
queue.mu.Lock()
|
|
defer queue.mu.Unlock()
|
|
queue.state[node.Id] = stateSuccess
|
|
queue.insert(nodes...)
|
|
}
|
|
|
|
// QueryFailure marks the node as failing query
|
|
func (queue *discoveryQueue) QueryFailure(node *pb.Node) {
|
|
queue.mu.Lock()
|
|
queue.state[node.Id] = stateFailure
|
|
queue.mu.Unlock()
|
|
}
|
|
|
|
// Unqueried returns the number of unqueried items in the queue
|
|
func (queue *discoveryQueue) Unqueried() (amount int) {
|
|
queue.mu.Lock()
|
|
defer queue.mu.Unlock()
|
|
|
|
for _, item := range queue.items {
|
|
if queue.state[item.node.Id] == stateUnqueried {
|
|
amount++
|
|
}
|
|
}
|
|
return amount
|
|
}
|