PriorityQueue refactor (#500)

* PriorityQueue refactor

* changed closest to return big.Int

* test smaller queue, return nil on empty
This commit is contained in:
Bill Thorp 2018-10-24 08:24:47 -04:00 committed by GitHub
parent 2bb2c50d79
commit 842ebc9546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 170 deletions

View File

@ -4,10 +4,9 @@
package kademlia
import (
"container/heap"
"bytes"
"context"
"log"
"math/big"
"time"
"storj.io/storj/pkg/dht"
@ -17,7 +16,7 @@ import (
type sequentialLookup struct {
contacted map[string]bool
queue PriorityQueue
queue *XorQueue
slowestResponse time.Duration
client node.Client
target dht.NodeID
@ -26,20 +25,8 @@ type sequentialLookup struct {
}
func newSequentialLookup(rt *RoutingTable, nodes []*pb.Node, client node.Client, target dht.NodeID, limit int, bootstrap bool) *sequentialLookup {
targetBytes := new(big.Int).SetBytes(target.Bytes())
var queue PriorityQueue
{
for i, node := range nodes {
bnode := new(big.Int).SetBytes([]byte(node.GetId()))
queue = append(queue, &Item{
value: node,
priority: new(big.Int).Xor(targetBytes, bnode),
index: i,
})
}
heap.Init(&queue)
}
queue := NewXorQueue(limit)
queue.Insert(target, nodes)
return &sequentialLookup{
contacted: map[string]bool{},
@ -53,38 +40,29 @@ func newSequentialLookup(rt *RoutingTable, nodes []*pb.Node, client node.Client,
}
func (lookup *sequentialLookup) Run(ctx context.Context) error {
zero := &big.Int{}
targetBytes := new(big.Int).SetBytes(lookup.target.Bytes())
for len(lookup.queue) > 0 {
for lookup.queue.Len() > 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
item := heap.Pop(&lookup.queue).(*Item)
if !lookup.bootstrap && item.priority.Cmp(zero) == 0 {
// found the result
return nil
next, priority := lookup.queue.Closest()
if !lookup.bootstrap && bytes.Equal(priority.Bytes(), make([]byte, len(priority.Bytes()))) {
return nil // found the result
}
next := item.value
uncontactedNeighbors := []*pb.Node{}
neighbors := lookup.FetchNeighbors(ctx, next)
for _, neighbor := range neighbors {
if lookup.contacted[neighbor.GetId()] {
continue
if !lookup.contacted[neighbor.GetId()] {
uncontactedNeighbors = append(uncontactedNeighbors, neighbor)
}
priority := new(big.Int).Xor(targetBytes, new(big.Int).SetBytes(lookup.target.Bytes()))
heap.Push(&lookup.queue, &Item{
value: neighbor,
priority: priority,
})
}
lookup.queue.Insert(lookup.target, uncontactedNeighbors)
for len(lookup.queue) > lookup.limit {
heap.Pop(&lookup.queue)
for lookup.queue.Len() > lookup.limit {
lookup.queue.Closest()
}
}
return nil

View File

@ -4,57 +4,102 @@
package kademlia
import (
"container/heap"
"math/big"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/pb"
)
// An Item is something we manage in a priority queue.
type Item struct {
//XorQueue is a priority queue where the priority is key XOR distance
type XorQueue struct {
maxLen int
items items
}
//NewXorQueue returns a items with priority based on XOR from targetBytes
func NewXorQueue(size int) *XorQueue {
return &XorQueue{items: make(items, 0, size), maxLen: size}
}
//Insert adds Node onto the queue
func (x *XorQueue) Insert(target dht.NodeID, nodes []*pb.Node) {
targetBytes := new(big.Int).SetBytes(target.Bytes())
//insert new nodes
for _, node := range nodes {
heap.Push(&x.items, &item{
value: node,
priority: new(big.Int).Xor(targetBytes, new(big.Int).SetBytes([]byte(node.GetId()))),
})
}
//resize down if we grew too big
if x.items.Len() > x.maxLen {
olditems := x.items
x.items = items{}
for i := 0; i < x.maxLen && len(olditems) > 0; i++ {
item := heap.Pop(&olditems)
heap.Push(&x.items, item)
}
heap.Init(&x.items)
}
}
//Closest removed the closest priority node from the queue
func (x *XorQueue) Closest() (*pb.Node, big.Int) {
if x.Len() == 0 {
return nil, big.Int{}
}
item := *(heap.Pop(&x.items).(*item))
return item.value, *item.priority
}
//Len returns the number of items in the queue
func (x *XorQueue) Len() int {
return x.items.Len()
}
// An item is something we manage in a priority queue.
type item struct {
value *pb.Node // The value of the item; arbitrary.
priority *big.Int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
// A items implements heap.Interface and holds items.
type items []*item
// Len returns the length of the priority queue
func (pq PriorityQueue) Len() int { return len(pq) }
func (items items) Len() int { return len(items) }
// Less does what you would think
func (pq PriorityQueue) Less(i, j int) bool {
func (items items) Less(i, j int) bool {
// this sorts the nodes where the node popped has the closest location
if i := pq[i].priority.Cmp(pq[j].priority); i < 0 {
return true
}
return false
return items[i].priority.Cmp(items[j].priority) < 0
}
// Swap swaps two ints
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
func (items items) Swap(i, j int) {
items[i], items[j] = items[j], items[i]
items[i].index = i
items[j].index = j
}
// Push adds an item to the top of the queue
// must call heap.fix to resort
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
func (items *items) Push(x interface{}) {
n := len(*items)
item := x.(*item)
item.index = n
*pq = append(*pq, item)
*items = append(*items, item)
}
// Pop returns the item with the lowest priority
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
func (items *items) Pop() interface{} {
old := *items
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
*items = old[0 : n-1]
return item
}

View File

@ -4,60 +4,46 @@
package kademlia
import (
"container/heap"
"math/big"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
)
func TestPriorityQueue(t *testing.T) {
cases := []struct {
target *big.Int
nodes map[string]*pb.Node
pq PriorityQueue
expected []int
}{
{
target: func() *big.Int {
i, ok := new(big.Int).SetString("0001", 2)
assert.True(t, ok)
return i
}(),
nodes: map[string]*pb.Node{
"1001": &pb.Node{Id: "1001"},
"0100": &pb.Node{Id: "0100"},
"1100": &pb.Node{Id: "1100"},
"0010": &pb.Node{Id: "0010"},
},
pq: make(PriorityQueue, 4),
expected: []int{3, 5, 8, 13},
},
//BinStr turns a string like '110001' into a string like 'a'
func BinStr(s string) string {
b := []byte(strings.Repeat("0", 8-len(s)%8) + s)
a := make([]byte, len(b)/8)
for i := 0; i < len(b); i++ {
a[i/8] |= ((b[i] - '0') << uint(7-i%8))
}
for _, v := range cases {
i := 0
for id, value := range v.nodes {
bn, ok := new(big.Int).SetString(id, 2)
assert.True(t, ok)
v.pq[i] = &Item{
value: value,
priority: new(big.Int).Xor(v.target, bn),
index: i,
}
i++
}
heap.Init(&v.pq)
i = 0
for v.pq.Len() > 0 {
item := heap.Pop(&v.pq).(*Item)
assert.Equal(t, big.NewInt(int64(v.expected[i])), item.priority)
i++
}
}
return string(a)
}
func TestXorQueue(t *testing.T) {
target := node.ID(BinStr("0001"))
testValues := []string{"0011", "0110", "0111", "1000"} //0011, 0110, 0111, 1000
expectedPriority := []int{2, 6, 7, 9} // 0010=>2, 0111=>7, 0110=>6, 1001=>9
expectedIds := []string{"0011", "0111", "0110", "1000"}
nodes := make([]*pb.Node, len(testValues))
for i, value := range testValues {
nodes[i] = &pb.Node{Id: BinStr(value)}
}
//populate queue
pq := NewXorQueue(3)
pq.Insert(&target, nodes)
//make sure we remove as many things as the queue should hold
assert.Equal(t, pq.Len(), 3)
for i := 0; pq.Len() > 0; i++ {
node, priority := pq.Closest()
assert.Equal(t, *big.NewInt(int64(expectedPriority[i])), priority)
assert.Equal(t, BinStr(expectedIds[i]), node.Id)
}
//test that reading beyong length returns nil
node, _ := pq.Closest()
assert.Nil(t, node)
}

View File

@ -4,15 +4,12 @@
package kademlia
import (
"container/heap"
"context"
"log"
"math/big"
"sync"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
@ -30,7 +27,7 @@ var (
// worker pops work off a priority queue and does lookups on the work received
type worker struct {
contacted map[string]bool
pq PriorityQueue
pq *XorQueue
mu *sync.Mutex
maxResponse time.Duration
cancel context.CancelFunc
@ -41,24 +38,8 @@ type worker struct {
}
func newWorker(ctx context.Context, rt *RoutingTable, nodes []*pb.Node, nc node.Client, target dht.NodeID, k int) *worker {
t := new(big.Int).SetBytes(target.Bytes())
pq := func(nodes []*pb.Node) PriorityQueue {
pq := make(PriorityQueue, len(nodes))
for i, node := range nodes {
bnode := new(big.Int).SetBytes([]byte(node.GetId()))
pq[i] = &Item{
value: node,
priority: new(big.Int).Xor(t, bnode),
index: i,
}
}
heap.Init(&pq)
return pq
}(nodes)
pq := NewXorQueue(k)
pq.Insert(target, nodes)
return &worker{
contacted: map[string]bool{},
pq: pq,
@ -123,7 +104,8 @@ func (w *worker) getWork(ctx context.Context, ch chan *pb.Node) {
}
w.workInProgress++
ch <- w.pq.Pop().(*Item).value
node, _ := w.pq.Closest()
ch <- node
w.mu.Unlock()
}
@ -157,44 +139,17 @@ func (w *worker) lookup(ctx context.Context, node *pb.Node) []*pb.Node {
}
func (w *worker) update(nodes []*pb.Node) {
t := new(big.Int).SetBytes(w.find.Bytes())
w.mu.Lock()
defer w.mu.Unlock()
uncontactedNodes := []*pb.Node{}
for _, v := range nodes {
// if we have already done a lookup on this node we don't want to do it again for this lookup loop
if w.contacted[v.GetId()] {
continue
}
heap.Push(&w.pq, &Item{
value: v,
priority: new(big.Int).Xor(t, new(big.Int).SetBytes(w.find.Bytes())),
})
}
// reinitialize heap
heap.Init(&w.pq)
// only keep the k closest nodes
if len(w.pq) <= w.k {
w.workInProgress--
return
}
pq := PriorityQueue{}
for i := 0; i < w.k; i++ {
if len(w.pq) > 0 {
item := heap.Pop(&w.pq)
heap.Push(&pq, item)
if !w.contacted[v.GetId()] {
uncontactedNodes = append(uncontactedNodes, v)
}
}
// reinitialize heap
heap.Init(&pq)
// set w.pq to the new pq with the k closest nodes
w.pq = pq
w.pq.Insert(w.find, uncontactedNodes)
w.workInProgress--
}

View File

@ -38,8 +38,8 @@ func TestGetWork(t *testing.T) {
worker: func() *worker {
w := newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "foo"}}, nil, node.IDFromString("foo"), 5)
w.maxResponse = 0
w.pq.Pop()
assert.Len(t, w.pq, 0)
w.pq.Closest()
assert.Equal(t, w.pq.Len(), 0)
return w
}(),
expected: nil,
@ -158,25 +158,24 @@ func TestUpdate(t *testing.T) {
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
nc, err := node.NewNodeClient(identity, pb.Node{Id: "foo", Address: &pb.NodeAddress{Address: ":7070"}}, mockDHT)
nc, err := node.NewNodeClient(identity, pb.Node{Id: "a", Address: &pb.NodeAddress{Address: ":7070"}}, mockDHT)
assert.NoError(t, err)
return newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "0001"}}, nc, node.IDFromString("1100"), 2)
return newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "h"}}, nc, node.IDFromString("a"), 2)
}(),
expectedQueueLength: 2,
expected: []*pb.Node{&pb.Node{Id: "0100"}, &pb.Node{Id: "1001"}},
input: []*pb.Node{&pb.Node{Id: "1001"}, &pb.Node{Id: "0100"}},
expected: []*pb.Node{&pb.Node{Id: "g"}, &pb.Node{Id: "f"}},
input: []*pb.Node{&pb.Node{Id: "f"}, &pb.Node{Id: "g"}},
expectedErr: nil,
},
}
for _, v := range cases {
v.worker.update(v.input)
assert.Len(t, v.worker.pq, v.expectedQueueLength)
assert.Equal(t, v.expectedQueueLength, v.worker.pq.Len())
i := 0
for v.worker.pq.Len() > 0 {
assert.Equal(t, v.expected[i], v.worker.pq.Pop().(*Item).value)
node, _ := v.worker.pq.Closest()
assert.Equal(t, v.expected[i], node)
i++
}
}