Fix queue implementation (#758)

This commit is contained in:
Egon Elbre 2018-12-04 17:46:53 +02:00 committed by GitHub
parent c11a4385ae
commit ae790dfd9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 242 additions and 226 deletions

View File

@ -5,6 +5,7 @@ package kademlia
import (
"context"
"sort"
"sync"
"github.com/zeebo/errs"
@ -21,27 +22,30 @@ type peerDiscovery struct {
opts discoveryOptions
cond sync.Cond
queue *XorQueue
queue discoveryQueue
}
// ErrMaxRetries is used when a lookup has been retried the max number of times
var ErrMaxRetries = errs.Class("max retries exceeded for id:")
func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery {
queue := NewXorQueue(opts.concurrency)
queue.Insert(target, nodes)
return &peerDiscovery{
discovery := &peerDiscovery{
client: client,
target: target,
opts: opts,
cond: sync.Cond{L: &sync.Mutex{}},
queue: queue,
cond: sync.Cond{L: &sync.Mutex{}},
queue: *newDiscoveryQueue(opts.concurrency),
}
discovery.queue.Insert(target, nodes...)
return discovery
}
func (lookup *peerDiscovery) Run(ctx context.Context) error {
if lookup.queue.Len() == 0 {
return nil // TODO: should we return an error here?
}
wg := sync.WaitGroup{}
// protected by `lookup.cond.L`
@ -53,9 +57,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
go func() {
defer wg.Done()
for {
var (
next *pb.Node
)
var next *pb.Node
lookup.cond.L.Lock()
for {
@ -65,7 +67,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
return
}
next, _ = lookup.queue.Closest()
next = lookup.queue.Closest()
if !lookup.opts.bootstrap && next.Id == lookup.target {
allDone = true
break // closest node is the target and is already in routing table (i.e. no lookup required)
@ -83,7 +85,9 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
neighbors, err := lookup.client.Lookup(ctx, *next, pb.Node{Id: lookup.target})
if err != nil {
ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries)
// TODO: reenable retry after fixing logic
// ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries)
ok := false
if !ok {
zap.S().Errorf(
"Error occurred during lookup of %s :: %s :: error = %s",
@ -94,7 +98,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error {
}
}
lookup.queue.Insert(lookup.target, neighbors)
lookup.queue.Insert(lookup.target, neighbors...)
lookup.cond.L.Lock()
working--
@ -117,3 +121,112 @@ func isDone(ctx context.Context) bool {
return false
}
}
// discoveryQueue is a limited priority queue for nodes with xor distance
type discoveryQueue struct {
maxLen int
mu sync.Mutex
added map[storj.NodeID]int
items []queueItem
}
// queueItem is node with a priority
type queueItem struct {
node *pb.Node
priority storj.NodeID
}
// newDiscoveryQueue returns a items with priority based on XOR from targetBytes
func newDiscoveryQueue(size int) *discoveryQueue {
return &discoveryQueue{
added: make(map[storj.NodeID]int),
maxLen: size,
}
}
// Insert adds nodes into the queue.
func (queue *discoveryQueue) Insert(target storj.NodeID, nodes ...*pb.Node) {
queue.mu.Lock()
defer queue.mu.Unlock()
unique := nodes[:0]
for _, node := range nodes {
if _, added := queue.added[node.Id]; added {
continue
}
unique = append(unique, node)
}
queue.insert(target, unique...)
// update counts for the new items that are in the queue
for _, item := range queue.items {
if _, added := queue.added[item.node.Id]; !added {
queue.added[item.node.Id] = 1
}
}
}
// Reinsert adds a Nodes into the queue, only if it's has been added less than limit times.
func (queue *discoveryQueue) Reinsert(target storj.NodeID, node *pb.Node, limit int) bool {
queue.mu.Lock()
defer queue.mu.Unlock()
nodeID := node.Id
if queue.added[nodeID] >= limit {
return false
}
queue.added[nodeID]++
queue.insert(target, node)
return true
}
// insert must hold lock while adding
func (queue *discoveryQueue) insert(target storj.NodeID, nodes ...*pb.Node) {
for _, node := range nodes {
queue.items = append(queue.items, queueItem{
node: node,
priority: xorNodeID(target, node.Id),
})
}
sort.Slice(queue.items, func(i, k int) bool {
return queue.items[i].priority.Less(queue.items[k].priority)
})
if len(queue.items) > queue.maxLen {
queue.items = queue.items[:queue.maxLen]
}
}
// Closest returns the closest item in the queue
func (queue *discoveryQueue) Closest() *pb.Node {
queue.mu.Lock()
defer queue.mu.Unlock()
if len(queue.items) == 0 {
return nil
}
var item queueItem
item, queue.items = queue.items[0], queue.items[1:]
return item.node
}
// Len returns the number of items in the queue
func (queue *discoveryQueue) Len() int {
queue.mu.Lock()
defer queue.mu.Unlock()
return len(queue.items)
}
// xorNodeID returns the xor of each byte in NodeID
func xorNodeID(a, b storj.NodeID) storj.NodeID {
r := storj.NodeID{}
for i, av := range a {
r[i] = av ^ b[i]
}
return r
}

View File

@ -0,0 +1,100 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"math/rand"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
func TestDiscoveryQueue(t *testing.T) {
target := storj.NodeID{1, 1} // 00000001
// // id -> id ^ target
nodeA := &pb.Node{Id: storj.NodeID{3, 2}} // 00000011:00000010 -> 00000010:00000011
nodeB := &pb.Node{Id: storj.NodeID{6, 5}} // 00000110:00000101 -> 00000111:00000100
nodeC := &pb.Node{Id: storj.NodeID{7, 7}} // 00000111:00000111 -> 00000110:00000110
nodeD := &pb.Node{Id: storj.NodeID{8, 4}} // 00001000:00000100 -> 00001001:00000101
nodeE := &pb.Node{Id: storj.NodeID{12, 1}} // 00001100:00000001 -> 00001101:00000000
nodeF := &pb.Node{Id: storj.NodeID{15, 16}} // 00001111:00010000 -> 00001110:00010001
nodeG := &pb.Node{Id: storj.NodeID{18, 74}} // 00010010:01001010 -> 00010011:01001011
nodeH := &pb.Node{Id: storj.NodeID{25, 61}} // 00011001:00111101 -> 00011000:00111100
nodes := []*pb.Node{nodeA, nodeB, nodeC, nodeD, nodeE, nodeF, nodeG, nodeH}
expected := []*pb.Node{
nodeA, // 00000011:00000010 -> 00000010:00000011
nodeC, // 00000111:00000111 -> 00000110:00000110
nodeB, // 00000110:00000101 -> 00000111:00000100
nodeD, // 00001000:00000100 -> 00001001:00000101
nodeE, // 00001100:00000001 -> 00001101:00000000
nodeF, // 00001111:00010000 -> 00001110:00010001
// nodeG, // 00010010:01001010 -> 00010011:01001011
// nodeH, // 00011001:00111101 -> 00011000:00111100
}
// // code for outputting the bits above
// for _, node := range nodes {
// xor := xorNodeID(target, node.Id)
// t.Logf("%08b,%08b -> %08b,%08b", node.Id[0], node.Id[1], xor[0], xor[1])
// }
queue := newDiscoveryQueue(6)
queue.Insert(target, nodes...)
assert.Equal(t, queue.Len(), 6)
for i, expect := range expected {
node := queue.Closest()
assert.Equal(t, node.Id, expect.Id, strconv.Itoa(i))
}
assert.Nil(t, queue.Closest())
}
func TestDiscoveryQueueRandom(t *testing.T) {
const maxLen = 8
seed := int64(rand.Uint64())
t.Logf("seed %v", seed)
r := rand.New(rand.NewSource(seed))
for i := 0; i < 100; i++ {
var target storj.NodeID
_, _ = r.Read(target[:])
var initial []*pb.Node
for k := 0; k < 10; k++ {
var nodeID storj.NodeID
_, _ = r.Read(nodeID[:])
initial = append(initial, &pb.Node{Id: nodeID})
}
queue := newDiscoveryQueue(maxLen)
queue.Insert(target, initial...)
for k := 0; k < 10; k++ {
var nodeID storj.NodeID
_, _ = r.Read(nodeID[:])
queue.Insert(target, &pb.Node{Id: nodeID})
}
assert.Equal(t, queue.Len(), maxLen)
previousPriority := storj.NodeID{}
for queue.Len() > 0 {
next := queue.Closest()
priority := xorNodeID(target, next.Id)
// ensure that priority is monotonically increasing
assert.False(t, priority.Less(previousPriority))
}
}
}

View File

@ -1,155 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"container/heap"
"math/big"
"sync"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
// XorQueue is a priority queue where the priority is key XOR distance
type XorQueue struct {
maxLen int
mu sync.Mutex
added map[storj.NodeID]int
items items
}
// NewXorQueue returns a items with priority based on XOR from targetBytes
func NewXorQueue(size int) *XorQueue {
return &XorQueue{
items: make(items, 0, size),
added: make(map[storj.NodeID]int),
maxLen: size,
}
}
// Insert adds Nodes onto the queue
func (x *XorQueue) Insert(target storj.NodeID, nodes []*pb.Node) {
x.mu.Lock()
defer x.mu.Unlock()
unique := nodes[:0]
for _, node := range nodes {
nodeID := node.Id
if _, added := x.added[nodeID]; !added {
x.added[nodeID]++
unique = append(unique, node)
}
}
x.insert(target, unique)
}
// Reinsert adds a Nodes onto the queue if it's been added >= limit times previously
func (x *XorQueue) Reinsert(target storj.NodeID, node *pb.Node, limit int) bool {
x.mu.Lock()
defer x.mu.Unlock()
nodeID := node.Id
if x.added[nodeID] >= limit {
return false
}
x.added[nodeID]++
x.insert(target, []*pb.Node{node})
return true
}
func reverse(b []byte) (r []byte) {
for _, v := range b {
r = append([]byte{v}, r...)
}
return r
}
// insert must hold lock while adding
func (x *XorQueue) insert(target storj.NodeID, nodes []*pb.Node) {
targetBytes := new(big.Int).SetBytes(reverse(target.Bytes()))
// insert new nodes
for _, node := range nodes {
heap.Push(&x.items, &item{
value: node,
priority: new(big.Int).Xor(targetBytes, new(big.Int).SetBytes(reverse(node.Id.Bytes()))),
})
}
// resize down if we grew too big
if x.items.Len() > x.maxLen {
olditems := x.items
x.items = items{}
for i := 0; i < x.maxLen && len(olditems) > 0; i++ {
item := heap.Pop(&olditems)
heap.Push(&x.items, item)
}
heap.Init(&x.items)
}
}
// Closest removes the closest priority node from the queue
func (x *XorQueue) Closest() (*pb.Node, big.Int) {
x.mu.Lock()
defer x.mu.Unlock()
if x.Len() == 0 {
return nil, big.Int{}
}
item := *(heap.Pop(&x.items).(*item))
return item.value, *item.priority
}
// Len returns the number of items in the queue
func (x *XorQueue) Len() int {
return x.items.Len()
}
// An item is something we manage in a priority queue.
type item struct {
value *pb.Node // The value of the item; arbitrary.
priority *big.Int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A items implements heap.Interface and holds items.
type items []*item
// Len returns the length of the priority queue
func (items items) Len() int { return len(items) }
// Less does what you would think
func (items items) Less(i, j int) bool {
// this sorts the nodes where the node popped has the closest location
return items[i].priority.Cmp(items[j].priority) < 0
}
// Swap swaps two ints
func (items items) Swap(i, j int) {
items[i], items[j] = items[j], items[i]
items[i].index = i
items[j].index = j
}
// Push adds an item to the top of the queue
// must call heap.fix to resort
func (items *items) Push(x interface{}) {
n := len(*items)
item := x.(*item)
item.index = n
*items = append(*items, item)
}
// Pop returns the item with the lowest priority
func (items *items) Pop() interface{} {
old := *items
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*items = old[0 : n-1]
return item
}

View File

@ -1,39 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/pb"
)
func TestXorQueue(t *testing.T) {
target := teststorj.NodeIDFromBytes([]byte{1})
testValues := []byte{3, 6, 7, 8} // 0011, 0110, 0111, 1000
expectedPriority := []int{2, 6, 7, 9} // 0010=>2, 0111=>7, 0110=>6, 1001=>9
expectedIds := []byte{3, 7, 6, 8}
nodes := make([]*pb.Node, len(testValues))
for i, v := range testValues {
nodes[i] = &pb.Node{Id: teststorj.NodeIDFromBytes([]byte{v})}
}
// populate queue
pq := NewXorQueue(3)
pq.Insert(target, nodes)
// make sure we remove as many things as the queue should hold
assert.Equal(t, pq.Len(), 3)
for i := 0; pq.Len() > 0; i++ {
node, priority := pq.Closest()
assert.Equal(t, *big.NewInt(int64(expectedPriority[i])), priority)
assert.Equal(t, []byte{expectedIds[i]}, node.Id[:1])
}
// test that reading beyong length returns nil
node, _ := pq.Closest()
assert.Nil(t, node)
}

View File

@ -74,6 +74,18 @@ func (id NodeID) String() string {
// Bytes returns raw bytes of the id
func (id NodeID) Bytes() []byte { return id[:] }
// Less returns whether id is smaller than b in lexiographic order
func (id NodeID) Less(b NodeID) bool {
for k, v := range id {
if v < b[k] {
return true
} else if v > b[k] {
return false
}
}
return false
}
// Difficulty returns the number of trailing zero bits in a node ID
func (id NodeID) Difficulty() (uint16, error) {
idLen := len(id)
@ -140,25 +152,10 @@ func (n NodeIDList) Bytes() (idsBytes [][]byte) {
}
// Len implements sort.Interface.Len()
func (n NodeIDList) Len() int {
return len(n)
}
func (n NodeIDList) Len() int { return len(n) }
// Swap implements sort.Interface.Swap()
func (n NodeIDList) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
func (n NodeIDList) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
// Less implements sort.Interface.Less()
func (n NodeIDList) Less(i, j int) bool {
for k, v := range n[i] {
if v < n[j][k] {
return true
} else if v > n[j][k] {
return false
}
// compare next index
}
// identical nodeIDs
return false
}
func (n NodeIDList) Less(i, j int) bool { return n[i].Less(n[j]) }