From ae790dfd9f8ae649424c302048087a17c1365a1b Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 4 Dec 2018 17:46:53 +0200 Subject: [PATCH] Fix queue implementation (#758) --- pkg/kademlia/peer_discovery.go | 141 ++++++++++++++++++++++--- pkg/kademlia/peer_discovery_test.go | 100 ++++++++++++++++++ pkg/kademlia/queue.go | 155 ---------------------------- pkg/kademlia/queue_test.go | 39 ------- pkg/storj/node.go | 33 +++--- 5 files changed, 242 insertions(+), 226 deletions(-) create mode 100644 pkg/kademlia/peer_discovery_test.go delete mode 100644 pkg/kademlia/queue.go delete mode 100644 pkg/kademlia/queue_test.go diff --git a/pkg/kademlia/peer_discovery.go b/pkg/kademlia/peer_discovery.go index 69cee1338..b89c64547 100644 --- a/pkg/kademlia/peer_discovery.go +++ b/pkg/kademlia/peer_discovery.go @@ -5,6 +5,7 @@ package kademlia import ( "context" + "sort" "sync" "github.com/zeebo/errs" @@ -21,27 +22,30 @@ type peerDiscovery struct { opts discoveryOptions cond sync.Cond - queue *XorQueue + queue discoveryQueue } // ErrMaxRetries is used when a lookup has been retried the max number of times var ErrMaxRetries = errs.Class("max retries exceeded for id:") func newPeerDiscovery(nodes []*pb.Node, client node.Client, target storj.NodeID, opts discoveryOptions) *peerDiscovery { - queue := NewXorQueue(opts.concurrency) - queue.Insert(target, nodes) - - return &peerDiscovery{ + discovery := &peerDiscovery{ client: client, target: target, opts: opts, - - cond: sync.Cond{L: &sync.Mutex{}}, - queue: queue, + cond: sync.Cond{L: &sync.Mutex{}}, + queue: *newDiscoveryQueue(opts.concurrency), } + + discovery.queue.Insert(target, nodes...) + return discovery } func (lookup *peerDiscovery) Run(ctx context.Context) error { + if lookup.queue.Len() == 0 { + return nil // TODO: should we return an error here? + } + wg := sync.WaitGroup{} // protected by `lookup.cond.L` @@ -53,9 +57,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { go func() { defer wg.Done() for { - var ( - next *pb.Node - ) + var next *pb.Node lookup.cond.L.Lock() for { @@ -65,7 +67,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { return } - next, _ = lookup.queue.Closest() + next = lookup.queue.Closest() if !lookup.opts.bootstrap && next.Id == lookup.target { allDone = true break // closest node is the target and is already in routing table (i.e. no lookup required) @@ -83,7 +85,9 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { neighbors, err := lookup.client.Lookup(ctx, *next, pb.Node{Id: lookup.target}) if err != nil { - ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries) + // TODO: reenable retry after fixing logic + // ok := lookup.queue.Reinsert(lookup.target, next, lookup.opts.retries) + ok := false if !ok { zap.S().Errorf( "Error occurred during lookup of %s :: %s :: error = %s", @@ -94,7 +98,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) error { } } - lookup.queue.Insert(lookup.target, neighbors) + lookup.queue.Insert(lookup.target, neighbors...) lookup.cond.L.Lock() working-- @@ -117,3 +121,112 @@ func isDone(ctx context.Context) bool { return false } } + +// discoveryQueue is a limited priority queue for nodes with xor distance +type discoveryQueue struct { + maxLen int + mu sync.Mutex + added map[storj.NodeID]int + items []queueItem +} + +// queueItem is node with a priority +type queueItem struct { + node *pb.Node + priority storj.NodeID +} + +// newDiscoveryQueue returns a items with priority based on XOR from targetBytes +func newDiscoveryQueue(size int) *discoveryQueue { + return &discoveryQueue{ + added: make(map[storj.NodeID]int), + maxLen: size, + } +} + +// Insert adds nodes into the queue. +func (queue *discoveryQueue) Insert(target storj.NodeID, nodes ...*pb.Node) { + queue.mu.Lock() + defer queue.mu.Unlock() + + unique := nodes[:0] + for _, node := range nodes { + if _, added := queue.added[node.Id]; added { + continue + } + unique = append(unique, node) + } + + queue.insert(target, unique...) + + // update counts for the new items that are in the queue + for _, item := range queue.items { + if _, added := queue.added[item.node.Id]; !added { + queue.added[item.node.Id] = 1 + } + } +} + +// Reinsert adds a Nodes into the queue, only if it's has been added less than limit times. +func (queue *discoveryQueue) Reinsert(target storj.NodeID, node *pb.Node, limit int) bool { + queue.mu.Lock() + defer queue.mu.Unlock() + + nodeID := node.Id + if queue.added[nodeID] >= limit { + return false + } + queue.added[nodeID]++ + + queue.insert(target, node) + return true +} + +// insert must hold lock while adding +func (queue *discoveryQueue) insert(target storj.NodeID, nodes ...*pb.Node) { + for _, node := range nodes { + queue.items = append(queue.items, queueItem{ + node: node, + priority: xorNodeID(target, node.Id), + }) + } + + sort.Slice(queue.items, func(i, k int) bool { + return queue.items[i].priority.Less(queue.items[k].priority) + }) + + if len(queue.items) > queue.maxLen { + queue.items = queue.items[:queue.maxLen] + } +} + +// Closest returns the closest item in the queue +func (queue *discoveryQueue) Closest() *pb.Node { + queue.mu.Lock() + defer queue.mu.Unlock() + + if len(queue.items) == 0 { + return nil + } + + var item queueItem + item, queue.items = queue.items[0], queue.items[1:] + return item.node +} + +// Len returns the number of items in the queue +func (queue *discoveryQueue) Len() int { + queue.mu.Lock() + defer queue.mu.Unlock() + + return len(queue.items) +} + +// xorNodeID returns the xor of each byte in NodeID +func xorNodeID(a, b storj.NodeID) storj.NodeID { + r := storj.NodeID{} + for i, av := range a { + r[i] = av ^ b[i] + } + return r +} diff --git a/pkg/kademlia/peer_discovery_test.go b/pkg/kademlia/peer_discovery_test.go new file mode 100644 index 000000000..73952ba90 --- /dev/null +++ b/pkg/kademlia/peer_discovery_test.go @@ -0,0 +1,100 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package kademlia + +import ( + "math/rand" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" +) + +func TestDiscoveryQueue(t *testing.T) { + target := storj.NodeID{1, 1} // 00000001 + + // // id -> id ^ target + nodeA := &pb.Node{Id: storj.NodeID{3, 2}} // 00000011:00000010 -> 00000010:00000011 + nodeB := &pb.Node{Id: storj.NodeID{6, 5}} // 00000110:00000101 -> 00000111:00000100 + nodeC := &pb.Node{Id: storj.NodeID{7, 7}} // 00000111:00000111 -> 00000110:00000110 + nodeD := &pb.Node{Id: storj.NodeID{8, 4}} // 00001000:00000100 -> 00001001:00000101 + nodeE := &pb.Node{Id: storj.NodeID{12, 1}} // 00001100:00000001 -> 00001101:00000000 + nodeF := &pb.Node{Id: storj.NodeID{15, 16}} // 00001111:00010000 -> 00001110:00010001 + nodeG := &pb.Node{Id: storj.NodeID{18, 74}} // 00010010:01001010 -> 00010011:01001011 + nodeH := &pb.Node{Id: storj.NodeID{25, 61}} // 00011001:00111101 -> 00011000:00111100 + + nodes := []*pb.Node{nodeA, nodeB, nodeC, nodeD, nodeE, nodeF, nodeG, nodeH} + + expected := []*pb.Node{ + nodeA, // 00000011:00000010 -> 00000010:00000011 + nodeC, // 00000111:00000111 -> 00000110:00000110 + nodeB, // 00000110:00000101 -> 00000111:00000100 + nodeD, // 00001000:00000100 -> 00001001:00000101 + nodeE, // 00001100:00000001 -> 00001101:00000000 + nodeF, // 00001111:00010000 -> 00001110:00010001 + // nodeG, // 00010010:01001010 -> 00010011:01001011 + // nodeH, // 00011001:00111101 -> 00011000:00111100 + } + + // // code for outputting the bits above + // for _, node := range nodes { + // xor := xorNodeID(target, node.Id) + // t.Logf("%08b,%08b -> %08b,%08b", node.Id[0], node.Id[1], xor[0], xor[1]) + // } + + queue := newDiscoveryQueue(6) + queue.Insert(target, nodes...) + + assert.Equal(t, queue.Len(), 6) + + for i, expect := range expected { + node := queue.Closest() + assert.Equal(t, node.Id, expect.Id, strconv.Itoa(i)) + } + + assert.Nil(t, queue.Closest()) +} + +func TestDiscoveryQueueRandom(t *testing.T) { + const maxLen = 8 + + seed := int64(rand.Uint64()) + t.Logf("seed %v", seed) + + r := rand.New(rand.NewSource(seed)) + + for i := 0; i < 100; i++ { + var target storj.NodeID + _, _ = r.Read(target[:]) + + var initial []*pb.Node + for k := 0; k < 10; k++ { + var nodeID storj.NodeID + _, _ = r.Read(nodeID[:]) + initial = append(initial, &pb.Node{Id: nodeID}) + } + + queue := newDiscoveryQueue(maxLen) + queue.Insert(target, initial...) + + for k := 0; k < 10; k++ { + var nodeID storj.NodeID + _, _ = r.Read(nodeID[:]) + queue.Insert(target, &pb.Node{Id: nodeID}) + } + + assert.Equal(t, queue.Len(), maxLen) + + previousPriority := storj.NodeID{} + for queue.Len() > 0 { + next := queue.Closest() + priority := xorNodeID(target, next.Id) + // ensure that priority is monotonically increasing + assert.False(t, priority.Less(previousPriority)) + } + } +} diff --git a/pkg/kademlia/queue.go b/pkg/kademlia/queue.go deleted file mode 100644 index 8de0f528e..000000000 --- a/pkg/kademlia/queue.go +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package kademlia - -import ( - "container/heap" - "math/big" - "sync" - - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/storj" -) - -// XorQueue is a priority queue where the priority is key XOR distance -type XorQueue struct { - maxLen int - - mu sync.Mutex - added map[storj.NodeID]int - items items -} - -// NewXorQueue returns a items with priority based on XOR from targetBytes -func NewXorQueue(size int) *XorQueue { - return &XorQueue{ - items: make(items, 0, size), - added: make(map[storj.NodeID]int), - maxLen: size, - } -} - -// Insert adds Nodes onto the queue -func (x *XorQueue) Insert(target storj.NodeID, nodes []*pb.Node) { - x.mu.Lock() - defer x.mu.Unlock() - - unique := nodes[:0] - for _, node := range nodes { - nodeID := node.Id - if _, added := x.added[nodeID]; !added { - x.added[nodeID]++ - unique = append(unique, node) - } - } - - x.insert(target, unique) -} - -// Reinsert adds a Nodes onto the queue if it's been added >= limit times previously -func (x *XorQueue) Reinsert(target storj.NodeID, node *pb.Node, limit int) bool { - x.mu.Lock() - defer x.mu.Unlock() - - nodeID := node.Id - if x.added[nodeID] >= limit { - return false - } - x.added[nodeID]++ - - x.insert(target, []*pb.Node{node}) - return true -} - -func reverse(b []byte) (r []byte) { - for _, v := range b { - r = append([]byte{v}, r...) - } - return r -} - -// insert must hold lock while adding -func (x *XorQueue) insert(target storj.NodeID, nodes []*pb.Node) { - targetBytes := new(big.Int).SetBytes(reverse(target.Bytes())) - // insert new nodes - for _, node := range nodes { - heap.Push(&x.items, &item{ - value: node, - priority: new(big.Int).Xor(targetBytes, new(big.Int).SetBytes(reverse(node.Id.Bytes()))), - }) - } - // resize down if we grew too big - if x.items.Len() > x.maxLen { - olditems := x.items - x.items = items{} - for i := 0; i < x.maxLen && len(olditems) > 0; i++ { - item := heap.Pop(&olditems) - heap.Push(&x.items, item) - } - heap.Init(&x.items) - } -} - -// Closest removes the closest priority node from the queue -func (x *XorQueue) Closest() (*pb.Node, big.Int) { - x.mu.Lock() - defer x.mu.Unlock() - - if x.Len() == 0 { - return nil, big.Int{} - } - item := *(heap.Pop(&x.items).(*item)) - return item.value, *item.priority -} - -// Len returns the number of items in the queue -func (x *XorQueue) Len() int { - return x.items.Len() -} - -// An item is something we manage in a priority queue. -type item struct { - value *pb.Node // The value of the item; arbitrary. - priority *big.Int // The priority of the item in the queue. - // The index is needed by update and is maintained by the heap.Interface methods. - index int // The index of the item in the heap. -} - -// A items implements heap.Interface and holds items. -type items []*item - -// Len returns the length of the priority queue -func (items items) Len() int { return len(items) } - -// Less does what you would think -func (items items) Less(i, j int) bool { - // this sorts the nodes where the node popped has the closest location - return items[i].priority.Cmp(items[j].priority) < 0 -} - -// Swap swaps two ints -func (items items) Swap(i, j int) { - items[i], items[j] = items[j], items[i] - items[i].index = i - items[j].index = j -} - -// Push adds an item to the top of the queue -// must call heap.fix to resort -func (items *items) Push(x interface{}) { - n := len(*items) - item := x.(*item) - item.index = n - *items = append(*items, item) -} - -// Pop returns the item with the lowest priority -func (items *items) Pop() interface{} { - old := *items - n := len(old) - item := old[n-1] - item.index = -1 // for safety - *items = old[0 : n-1] - return item -} diff --git a/pkg/kademlia/queue_test.go b/pkg/kademlia/queue_test.go deleted file mode 100644 index 3103c908e..000000000 --- a/pkg/kademlia/queue_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package kademlia - -import ( - "math/big" - "testing" - - "github.com/stretchr/testify/assert" - - "storj.io/storj/internal/teststorj" - "storj.io/storj/pkg/pb" -) - -func TestXorQueue(t *testing.T) { - target := teststorj.NodeIDFromBytes([]byte{1}) - testValues := []byte{3, 6, 7, 8} // 0011, 0110, 0111, 1000 - expectedPriority := []int{2, 6, 7, 9} // 0010=>2, 0111=>7, 0110=>6, 1001=>9 - expectedIds := []byte{3, 7, 6, 8} - - nodes := make([]*pb.Node, len(testValues)) - for i, v := range testValues { - nodes[i] = &pb.Node{Id: teststorj.NodeIDFromBytes([]byte{v})} - } - // populate queue - pq := NewXorQueue(3) - pq.Insert(target, nodes) - // make sure we remove as many things as the queue should hold - assert.Equal(t, pq.Len(), 3) - for i := 0; pq.Len() > 0; i++ { - node, priority := pq.Closest() - assert.Equal(t, *big.NewInt(int64(expectedPriority[i])), priority) - assert.Equal(t, []byte{expectedIds[i]}, node.Id[:1]) - } - // test that reading beyong length returns nil - node, _ := pq.Closest() - assert.Nil(t, node) -} diff --git a/pkg/storj/node.go b/pkg/storj/node.go index f06b93a85..a9fa74805 100644 --- a/pkg/storj/node.go +++ b/pkg/storj/node.go @@ -74,6 +74,18 @@ func (id NodeID) String() string { // Bytes returns raw bytes of the id func (id NodeID) Bytes() []byte { return id[:] } +// Less returns whether id is smaller than b in lexiographic order +func (id NodeID) Less(b NodeID) bool { + for k, v := range id { + if v < b[k] { + return true + } else if v > b[k] { + return false + } + } + return false +} + // Difficulty returns the number of trailing zero bits in a node ID func (id NodeID) Difficulty() (uint16, error) { idLen := len(id) @@ -140,25 +152,10 @@ func (n NodeIDList) Bytes() (idsBytes [][]byte) { } // Len implements sort.Interface.Len() -func (n NodeIDList) Len() int { - return len(n) -} +func (n NodeIDList) Len() int { return len(n) } // Swap implements sort.Interface.Swap() -func (n NodeIDList) Swap(i, j int) { - n[i], n[j] = n[j], n[i] -} +func (n NodeIDList) Swap(i, j int) { n[i], n[j] = n[j], n[i] } // Less implements sort.Interface.Less() -func (n NodeIDList) Less(i, j int) bool { - for k, v := range n[i] { - if v < n[j][k] { - return true - } else if v > n[j][k] { - return false - } - // compare next index - } - // identical nodeIDs - return false -} +func (n NodeIDList) Less(i, j int) bool { return n[i].Less(n[j]) }