Kademlia Sequential Lookup (#464)
This commit is contained in:
parent
f87d295e5a
commit
e39f9e42b4
@ -34,7 +34,8 @@ var defaultTransport = pb.NodeTransport_TCP
|
||||
var NodeNotFound = NodeErr.New("node not found")
|
||||
|
||||
type lookupOpts struct {
|
||||
amount int
|
||||
amount int
|
||||
bootstrap bool
|
||||
}
|
||||
|
||||
// Kademlia is an implementation of kademlia adhering to the DHT interface.
|
||||
@ -124,7 +125,7 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error {
|
||||
return BootstrapErr.New("no bootstrap nodes provided")
|
||||
}
|
||||
|
||||
return k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), lookupOpts{amount: 5})
|
||||
return k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), lookupOpts{amount: 5, bootstrap: true})
|
||||
}
|
||||
|
||||
func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts lookupOpts) error {
|
||||
@ -135,20 +136,12 @@ func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts lookupOpt
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cf := context.WithCancel(ctx)
|
||||
w := newWorker(ctx, k.routingTable, nodes, k.nodeClient, target, opts.amount)
|
||||
w.SetCancellation(cf)
|
||||
|
||||
wch := make(chan *pb.Node, k.alpha)
|
||||
// kick off go routine to fetch work and send on work channel
|
||||
go w.getWork(ctx, wch)
|
||||
// kick off alpha works to consume from work channel
|
||||
for i := 0; i < k.alpha; i++ {
|
||||
go w.work(ctx, wch)
|
||||
lookup := newSequentialLookup(k.routingTable, nodes, k.nodeClient, target, opts.amount, opts.bootstrap)
|
||||
err = lookup.Run(ctx)
|
||||
if err != nil {
|
||||
zap.L().Warn("lookup failed", zap.Error(err))
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package kademlia
|
||||
|
||||
@ -31,7 +32,7 @@ func kadconfig() KadConfig {
|
||||
// helper function to generate new node identities with
|
||||
// correct difficulty and concurrency
|
||||
func newTestIdentity() (*provider.FullIdentity, error) {
|
||||
fid, err := node.NewFullIdentity(ctx, 12, 4)
|
||||
fid, err := node.NewFullIdentity(context.Background(), 12, 4)
|
||||
return fid, err
|
||||
}
|
||||
|
||||
@ -70,7 +71,7 @@ func TestNewKademlia(t *testing.T) {
|
||||
for _, v := range cases {
|
||||
assert.NoError(t, v.setup())
|
||||
kc := kadconfig()
|
||||
ca, err := provider.NewCA(ctx, 12, 4)
|
||||
ca, err := provider.NewCA(context.Background(), 12, 4)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
|
111
pkg/kademlia/lookup.go
Normal file
111
pkg/kademlia/lookup.go
Normal file
@ -0,0 +1,111 @@
|
||||
package kademlia
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"log"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
type sequentialLookup struct {
|
||||
contacted map[string]bool
|
||||
queue PriorityQueue
|
||||
slowestResponse time.Duration
|
||||
client node.Client
|
||||
target dht.NodeID
|
||||
limit int
|
||||
bootstrap bool
|
||||
}
|
||||
|
||||
func newSequentialLookup(rt *RoutingTable, nodes []*pb.Node, client node.Client, target dht.NodeID, limit int, bootstrap bool) *sequentialLookup {
|
||||
targetBytes := new(big.Int).SetBytes(target.Bytes())
|
||||
|
||||
var queue PriorityQueue
|
||||
{
|
||||
for i, node := range nodes {
|
||||
bnode := new(big.Int).SetBytes([]byte(node.GetId()))
|
||||
queue = append(queue, &Item{
|
||||
value: node,
|
||||
priority: new(big.Int).Xor(targetBytes, bnode),
|
||||
index: i,
|
||||
})
|
||||
}
|
||||
heap.Init(&queue)
|
||||
}
|
||||
|
||||
return &sequentialLookup{
|
||||
contacted: map[string]bool{},
|
||||
queue: queue,
|
||||
slowestResponse: 0,
|
||||
client: client,
|
||||
target: target,
|
||||
limit: limit,
|
||||
bootstrap: bootstrap,
|
||||
}
|
||||
}
|
||||
|
||||
func (lookup *sequentialLookup) Run(ctx context.Context) error {
|
||||
zero := &big.Int{}
|
||||
targetBytes := new(big.Int).SetBytes(lookup.target.Bytes())
|
||||
|
||||
for len(lookup.queue) > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
item := heap.Pop(&lookup.queue).(*Item)
|
||||
if !lookup.bootstrap && item.priority.Cmp(zero) == 0 {
|
||||
// found the result
|
||||
return nil
|
||||
}
|
||||
next := item.value
|
||||
|
||||
neighbors := lookup.FetchNeighbors(ctx, next)
|
||||
for _, neighbor := range neighbors {
|
||||
if lookup.contacted[neighbor.GetId()] {
|
||||
continue
|
||||
}
|
||||
|
||||
priority := new(big.Int).Xor(targetBytes, new(big.Int).SetBytes(lookup.target.Bytes()))
|
||||
heap.Push(&lookup.queue, &Item{
|
||||
value: neighbor,
|
||||
priority: priority,
|
||||
})
|
||||
}
|
||||
|
||||
for len(lookup.queue) > lookup.limit {
|
||||
heap.Pop(&lookup.queue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lookup *sequentialLookup) FetchNeighbors(ctx context.Context, node *pb.Node) []*pb.Node {
|
||||
if node.GetAddress() == nil {
|
||||
return nil
|
||||
}
|
||||
lookup.contacted[node.GetId()] = true
|
||||
|
||||
start := time.Now()
|
||||
neighbors, err := lookup.client.Lookup(ctx, *node, pb.Node{Id: lookup.target.String()})
|
||||
if err != nil {
|
||||
// TODO(coyle): I think we might want to do another look up on this node or update something
|
||||
// but for now let's just log and ignore.
|
||||
log.Printf("Error occurred during lookup for %s on %s :: error = %s", lookup.target.String(), node.GetId(), err.Error())
|
||||
return []*pb.Node{}
|
||||
}
|
||||
|
||||
latency := time.Since(start)
|
||||
if latency > lookup.slowestResponse {
|
||||
lookup.slowestResponse = latency
|
||||
}
|
||||
|
||||
return neighbors
|
||||
}
|
@ -20,10 +20,6 @@ import (
|
||||
"storj.io/storj/pkg/provider"
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
)
|
||||
|
||||
func TestGetWork(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
@ -64,10 +60,17 @@ func TestGetWork(t *testing.T) {
|
||||
} else {
|
||||
assert.Len(t, v.ch, 0)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
worker := newWorker(ctx, nil, []*pb.Node{&pb.Node{Id: "1001"}}, nil, node.IDFromString("1000"), 5)
|
||||
// TODO: ensure this also works when running
|
||||
cancel()
|
||||
worker.work(ctx, make(chan *pb.Node))
|
||||
}
|
||||
|
||||
func TestWorkerLookup(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@ -89,7 +92,7 @@ func TestWorkerLookup(t *testing.T) {
|
||||
{
|
||||
name: "test valid chore returned",
|
||||
worker: func() *worker {
|
||||
ca, err := provider.NewCA(ctx, 12, 4)
|
||||
ca, err := provider.NewCA(context.Background(), 12, 4)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
@ -135,7 +138,7 @@ func TestUpdate(t *testing.T) {
|
||||
{
|
||||
name: "test nil nodes",
|
||||
worker: func() *worker {
|
||||
ca, err := provider.NewCA(ctx, 12, 4)
|
||||
ca, err := provider.NewCA(context.Background(), 12, 4)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
@ -151,7 +154,7 @@ func TestUpdate(t *testing.T) {
|
||||
{
|
||||
name: "test combined less than k",
|
||||
worker: func() *worker {
|
||||
ca, err := provider.NewCA(ctx, 12, 4)
|
||||
ca, err := provider.NewCA(context.Background(), 12, 4)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
@ -180,7 +183,7 @@ func TestUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func newTestServer(nn []*pb.Node) (*grpc.Server, *mockNodeServer) {
|
||||
ca, err := provider.NewCA(ctx, 12, 4)
|
||||
ca, err := provider.NewCA(context.Background(), 12, 4)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -4,9 +4,9 @@
|
||||
package pdbclient
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"encoding/base64"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -153,6 +153,7 @@ func (pdb *PointerDB) Delete(ctx context.Context, path p.Path) (err error) {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Auth gets signature auth data from last request
|
||||
func (pdb *PointerDB) Auth() (*pb.SignatureAuth, error) {
|
||||
signature := pdb.signatureHeader.Get("signature")
|
||||
|
@ -4,10 +4,10 @@
|
||||
package pdbclient
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/x509"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -15,10 +15,10 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gtank/cryptopasta"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/gtank/cryptopasta"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@ -308,8 +308,8 @@ func TestAuth(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
|
||||
peerCertificates := make([]*x509.Certificate, 2);
|
||||
|
||||
peerCertificates := make([]*x509.Certificate, 2)
|
||||
peerCertificates[0] = identity.Leaf
|
||||
peerCertificates[1] = identity.CA
|
||||
|
||||
@ -321,7 +321,7 @@ func TestAuth(t *testing.T) {
|
||||
peer := &peer.Peer{AuthInfo: info}
|
||||
pointerdb := &PointerDB{
|
||||
signatureHeader: &header,
|
||||
peer: peer,
|
||||
peer: peer,
|
||||
}
|
||||
|
||||
auth, _ := pointerdb.Auth()
|
||||
|
@ -4,8 +4,8 @@
|
||||
package pointerdb
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
|
Loading…
Reference in New Issue
Block a user