From b5447c66086c5c4a298e9ba511b88b56c33c3934 Mon Sep 17 00:00:00 2001 From: Jennifer Li Johnson Date: Fri, 22 Feb 2019 13:39:29 -0500 Subject: [PATCH] Routing table tests (#1270) This PR includes a new package called testrouting, which implements a very algorithmically slow but hopefully easy-to-keep-operationally-correct in-memory routing table. The routing table also supports writing out its current structure as a DOT graph for visualization. testrouting is primarily meant to help in coming up with generic routing table integration tests. This PR also includes a new routing table integration test suite that runs against all current routing table implementations. Our existing routing table passes a lot of the tests, but not all of them, still debugging why. I have confirmed the tests should pass with the visualization graphs though. --- pkg/dht/dht.go | 19 + pkg/kademlia/dialer_test.go | 2 - pkg/kademlia/kademlia_test.go | 7 +- pkg/kademlia/replacement_cache.go | 16 + pkg/kademlia/replacement_cache_test.go | 29 +- pkg/kademlia/routing.go | 7 +- pkg/kademlia/routing_helpers.go | 26 +- pkg/kademlia/routing_helpers_test.go | 144 ++-- .../routing_integration_helpers_test.go | 86 +++ pkg/kademlia/routing_integration_test.go | 615 ++++++++++++++++++ pkg/kademlia/routing_test.go | 67 +- pkg/kademlia/testrouting/testrouting.go | 308 +++++++++ pkg/kademlia/testrouting/utils.go | 65 ++ pkg/kademlia/testrouting/viz.go | 48 ++ pkg/pb/utils.go | 12 + 15 files changed, 1368 insertions(+), 83 deletions(-) create mode 100644 pkg/kademlia/routing_integration_helpers_test.go create mode 100644 pkg/kademlia/routing_integration_test.go create mode 100644 pkg/kademlia/testrouting/testrouting.go create mode 100644 pkg/kademlia/testrouting/utils.go create mode 100644 pkg/kademlia/testrouting/viz.go diff --git a/pkg/dht/dht.go b/pkg/dht/dht.go index c65c6f2eb..ba57b2cb3 100644 --- a/pkg/dht/dht.go +++ b/pkg/dht/dht.go @@ -5,9 +5,11 @@ package dht import ( "context" + "time" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" + "storj.io/storj/storage" ) // DHT is the interface for the DHT in the Storj network @@ -19,6 +21,23 @@ type DHT interface { Seen() []*pb.Node } +// RoutingTable contains information on nodes we have locally +type RoutingTable interface { + // local params + Local() pb.Node + K() int + CacheSize() int + GetBucketIds() (storage.Keys, error) + FindNear(id storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) + ConnectionSuccess(node *pb.Node) error + ConnectionFailed(node *pb.Node) error + // these are for refreshing + SetBucketTimestamp(id []byte, now time.Time) error + GetBucketTimestamp(id []byte) (time.Time, error) + + Close() error +} + // Bucket is a set of methods to act on kademlia k buckets type Bucket interface { Routing() []pb.Node diff --git a/pkg/kademlia/dialer_test.go b/pkg/kademlia/dialer_test.go index 4c7258157..dea50cddb 100644 --- a/pkg/kademlia/dialer_test.go +++ b/pkg/kademlia/dialer_test.go @@ -77,7 +77,6 @@ func TestDialer(t *testing.T) { if len(results) != expectedKademliaEntries { return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results))) } - return nil } return nil @@ -115,7 +114,6 @@ func TestDialer(t *testing.T) { if len(results) != expectedKademliaEntries { return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results))) } - return nil }) } diff --git a/pkg/kademlia/kademlia_test.go b/pkg/kademlia/kademlia_test.go index a4cff1a59..5dc7b17d1 100644 --- a/pkg/kademlia/kademlia_test.go +++ b/pkg/kademlia/kademlia_test.go @@ -130,18 +130,18 @@ func TestBootstrap(t *testing.T) { bn, s, clean := testNode(ctx, "1", t, []pb.Node{}) defer clean() - defer s.Stop() + defer s.GracefulStop() n1, s1, clean1 := testNode(ctx, "2", t, []pb.Node{bn.routingTable.self}) defer clean1() - defer s1.Stop() + defer s1.GracefulStop() err := n1.Bootstrap(ctx) assert.NoError(t, err) n2, s2, clean2 := testNode(ctx, "3", t, []pb.Node{bn.routingTable.self}) defer clean2() - defer s2.Stop() + defer s2.GracefulStop() err = n2.Bootstrap(ctx) assert.NoError(t, err) @@ -213,6 +213,7 @@ func TestRefresh(t *testing.T) { ts2, err := rt.GetBucketTimestamp(bID[:]) assert.NoError(t, err) assert.True(t, ts1.Equal(ts2)) + s.GracefulStop() } func TestFindNear(t *testing.T) { diff --git a/pkg/kademlia/replacement_cache.go b/pkg/kademlia/replacement_cache.go index 54e2fe85a..3c088aeb6 100644 --- a/pkg/kademlia/replacement_cache.go +++ b/pkg/kademlia/replacement_cache.go @@ -8,11 +8,27 @@ import ( ) func (rt *RoutingTable) addToReplacementCache(kadBucketID bucketID, node *pb.Node) { + rt.rcMutex.Lock() + defer rt.rcMutex.Unlock() nodes := rt.replacementCache[kadBucketID] nodes = append(nodes, node) + if len(nodes) > rt.rcBucketSize { copy(nodes, nodes[1:]) nodes = nodes[:len(nodes)-1] } rt.replacementCache[kadBucketID] = nodes } + +func (rt *RoutingTable) removeFromReplacementCache(kadBucketID bucketID, node *pb.Node) { + rt.rcMutex.Lock() + defer rt.rcMutex.Unlock() + nodes := rt.replacementCache[kadBucketID] + for i, n := range nodes { + if n.Id == node.Id && n.Address.GetAddress() == node.Address.GetAddress() { + nodes = append(nodes[:i], nodes[i+1:]...) + break + } + } + rt.replacementCache[kadBucketID] = nodes +} diff --git a/pkg/kademlia/replacement_cache_test.go b/pkg/kademlia/replacement_cache_test.go index 9b51c496e..fc44afc30 100644 --- a/pkg/kademlia/replacement_cache_test.go +++ b/pkg/kademlia/replacement_cache_test.go @@ -8,14 +8,17 @@ import ( "github.com/stretchr/testify/assert" + "storj.io/storj/internal/testcontext" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" ) func TestAddToReplacementCache(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{244, 255}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(storj.NodeID{244, 255}) + defer ctx.Check(rt.Close) kadBucketID := bucketID{255, 255} node1 := teststorj.MockNode(string([]byte{233, 255})) @@ -33,3 +36,25 @@ func TestAddToReplacementCache(t *testing.T) { rt.addToReplacementCache(kadBucketID2, node4) assert.Equal(t, []*pb.Node{node3, node4}, rt.replacementCache[kadBucketID2]) } + +func TestRemoveFromReplacementCache(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTableWith(storj.NodeID{244, 255}, routingTableOpts{cacheSize: 3}) + defer ctx.Check(rt.Close) + + kadBucketID2 := bucketID{127, 255} + node2 := teststorj.MockNode(string([]byte{100, 255})) + node3 := teststorj.MockNode(string([]byte{90, 255})) + node4 := teststorj.MockNode(string([]byte{80, 255})) + rt.addToReplacementCache(kadBucketID2, node2) + rt.addToReplacementCache(kadBucketID2, node3) + rt.addToReplacementCache(kadBucketID2, node4) + assert.Equal(t, []*pb.Node{node2, node3, node4}, rt.replacementCache[kadBucketID2]) + rt.removeFromReplacementCache(kadBucketID2, node3) + assert.Equal(t, []*pb.Node{node2, node4}, rt.replacementCache[kadBucketID2]) + rt.removeFromReplacementCache(kadBucketID2, node2) + assert.Equal(t, []*pb.Node{node4}, rt.replacementCache[kadBucketID2]) + rt.removeFromReplacementCache(kadBucketID2, node4) + assert.Equal(t, []*pb.Node{}, rt.replacementCache[kadBucketID2]) +} diff --git a/pkg/kademlia/routing.go b/pkg/kademlia/routing.go index 09c5cbb96..ae3daaead 100644 --- a/pkg/kademlia/routing.go +++ b/pkg/kademlia/routing.go @@ -60,6 +60,7 @@ type RoutingTable struct { nodeBucketDB storage.KeyValueStore transport *pb.NodeTransport mutex *sync.Mutex + rcMutex *sync.Mutex seen map[storj.NodeID]*pb.Node replacementCache map[bucketID][]*pb.Node bucketSize int // max number of nodes stored in a kbucket = 20 (k) @@ -86,6 +87,7 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key transport: &defaultTransport, mutex: &sync.Mutex{}, + rcMutex: &sync.Mutex{}, seen: make(map[storj.NodeID]*pb.Node), replacementCache: make(map[bucketID][]*pb.Node), @@ -99,7 +101,7 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key return rt, nil } -// Close close without closing dependencies +// Close closes without closing dependencies func (rt *RoutingTable) Close() error { return nil } @@ -224,6 +226,7 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error { if err != nil { return RoutingErr.New("could not add node %s", err) } + return nil } @@ -231,7 +234,7 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error { // a connection fails for the node on the network func (rt *RoutingTable) ConnectionFailed(node *pb.Node) error { node.Type.DPanicOnInvalid("connection failed") - err := rt.removeNode(node.Id) + err := rt.removeNode(node) if err != nil { return RoutingErr.New("could not remove node %s", err) } diff --git a/pkg/kademlia/routing_helpers.go b/pkg/kademlia/routing_helpers.go index cb8378ea8..796d2b16f 100644 --- a/pkg/kademlia/routing_helpers.go +++ b/pkg/kademlia/routing_helpers.go @@ -100,18 +100,35 @@ func (rt *RoutingTable) updateNode(node *pb.Node) error { } // removeNode will remove churned nodes and replace those entries with nodes from the replacement cache. -func (rt *RoutingTable) removeNode(nodeID storj.NodeID) error { - kadBucketID, err := rt.getKBucketID(nodeID) +func (rt *RoutingTable) removeNode(node *pb.Node) error { + rt.mutex.Lock() + defer rt.mutex.Unlock() + kadBucketID, err := rt.getKBucketID(node.Id) + if err != nil { return RoutingErr.New("could not get k bucket %s", err) } - _, err = rt.nodeBucketDB.Get(nodeID.Bytes()) + + existingMarshalled, err := rt.nodeBucketDB.Get(node.Id.Bytes()) if storage.ErrKeyNotFound.Has(err) { + //check replacement cache + rt.removeFromReplacementCache(kadBucketID, node) return nil } else if err != nil { return RoutingErr.New("could not get node %s", err) } - err = rt.nodeBucketDB.Delete(nodeID.Bytes()) + + var existing pb.Node + err = proto.Unmarshal(existingMarshalled, &existing) + if err != nil { + return RoutingErr.New("could not unmarshal node %s", err) + } + + if !pb.AddressEqual(existing.Address, node.Address) { + // don't remove a node if the address is different + return nil + } + err = rt.nodeBucketDB.Delete(node.Id.Bytes()) if err != nil { return RoutingErr.New("could not delete node %s", err) } @@ -125,6 +142,7 @@ func (rt *RoutingTable) removeNode(nodeID storj.NodeID) error { } rt.replacementCache[kadBucketID] = nodes[:len(nodes)-1] return nil + } // putNode: helper, adds or updates Node and ID to nodeBucketDB diff --git a/pkg/kademlia/routing_helpers_test.go b/pkg/kademlia/routing_helpers_test.go index 00232a094..9a5fc93d5 100644 --- a/pkg/kademlia/routing_helpers_test.go +++ b/pkg/kademlia/routing_helpers_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "storj.io/storj/internal/testcontext" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" @@ -21,8 +22,19 @@ import ( "storj.io/storj/storage/teststore" ) +type routingTableOpts struct { + bucketSize int + cacheSize int +} + // newTestRoutingTable returns a newly configured instance of a RoutingTable -func newTestRoutingTable(localNode pb.Node) (*RoutingTable, error) { +func newTestRoutingTable(localNode pb.Node, opts routingTableOpts) (*RoutingTable, error) { + if opts.bucketSize == 0 { + opts.bucketSize = 6 + } + if opts.cacheSize == 0 { + opts.cacheSize = 2 + } rt := &RoutingTable{ self: localNode, kadBucketDB: storelogger.New(zap.L().Named("rt.kad"), teststore.New()), @@ -30,11 +42,12 @@ func newTestRoutingTable(localNode pb.Node) (*RoutingTable, error) { transport: &defaultTransport, mutex: &sync.Mutex{}, + rcMutex: &sync.Mutex{}, seen: make(map[storj.NodeID]*pb.Node), replacementCache: make(map[bucketID][]*pb.Node), - bucketSize: 6, - rcBucketSize: 2, + bucketSize: opts.bucketSize, + rcBucketSize: opts.cacheSize, } ok, err := rt.addNode(&localNode) if !ok || err != nil { @@ -43,28 +56,28 @@ func newTestRoutingTable(localNode pb.Node) (*RoutingTable, error) { return rt, nil } -func createRoutingTable(t *testing.T, localNodeID storj.NodeID) (*RoutingTable, func()) { +func createRoutingTableWith(localNodeID storj.NodeID, opts routingTableOpts) *RoutingTable { if localNodeID == (storj.NodeID{}) { - localNodeID = teststorj.NodeIDFromString("AA") + panic("empty local node id") } localNode := pb.Node{Id: localNodeID} - rt, err := newTestRoutingTable(localNode) + rt, err := newTestRoutingTable(localNode, opts) if err != nil { - t.Fatal(err) + panic(err) } + return rt +} - return rt, func() { - err := rt.Close() - if err != nil { - t.Fatal(err) - } - } +func createRoutingTable(localNodeID storj.NodeID) *RoutingTable { + return createRoutingTableWith(localNodeID, routingTableOpts{}) } func TestAddNode(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("OO")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("OO")) + defer ctx.Check(rt.Close) // bucket, err := rt.kadBucketDB.Get(storage.Key([]byte{255, 255})) // assert.NoError(t, err) // assert.NotNil(t, bucket) @@ -216,8 +229,10 @@ func TestAddNode(t *testing.T) { } func TestUpdateNode(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) node := teststorj.MockNode("BB") ok, err := rt.addNode(node) assert.True(t, ok) @@ -241,8 +256,10 @@ func TestUpdateNode(t *testing.T) { } func TestRemoveNode(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) kadBucketID := firstBucketID node := teststorj.MockNode("BB") ok, err := rt.addNode(node) @@ -253,7 +270,7 @@ func TestRemoveNode(t *testing.T) { assert.NotNil(t, val) node2 := teststorj.MockNode("CC") rt.addToReplacementCache(kadBucketID, node2) - err = rt.removeNode(node.Id) + err = rt.removeNode(node) assert.NoError(t, err) val, err = rt.nodeBucketDB.Get(node.Id.Bytes()) assert.Nil(t, val) @@ -264,14 +281,19 @@ func TestRemoveNode(t *testing.T) { assert.Equal(t, 0, len(rt.replacementCache[kadBucketID])) //try to remove node not in rt - err = rt.removeNode(teststorj.NodeIDFromString("DD")) + err = rt.removeNode(&pb.Node{ + Id: teststorj.NodeIDFromString("DD"), + Address: &pb.NodeAddress{Address: "address:1"}, + }) assert.NoError(t, err) } func TestCreateOrUpdateKBucket(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() id := bucketID{255, 255} - rt, cleanup := createRoutingTable(t, storj.NodeID{}) - defer cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) err := rt.createOrUpdateKBucket(id, time.Now()) assert.NoError(t, err) val, e := rt.kadBucketDB.Get(id[:]) @@ -281,18 +303,22 @@ func TestCreateOrUpdateKBucket(t *testing.T) { } func TestGetKBucketID(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() kadIDA := bucketID{255, 255} nodeIDA := teststorj.NodeIDFromString("AA") - rt, cleanup := createRoutingTable(t, nodeIDA) - defer cleanup() + rt := createRoutingTable(nodeIDA) + defer ctx.Check(rt.Close) keyA, err := rt.getKBucketID(nodeIDA) assert.NoError(t, err) assert.Equal(t, kadIDA[:2], keyA[:2]) } func TestDetermineFurthestIDWithinK(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{127, 255}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(storj.NodeID{127, 255}) + defer ctx.Check(rt.Close) cases := []struct { testID string nodeID []byte @@ -331,8 +357,10 @@ func TestDetermineFurthestIDWithinK(t *testing.T) { } func TestNodeIsWithinNearestK(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{127, 255}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(storj.NodeID{127, 255}) + defer ctx.Check(rt.Close) rt.bucketSize = 2 cases := []struct { @@ -373,9 +401,11 @@ func TestNodeIsWithinNearestK(t *testing.T) { } func TestKadBucketContainsLocalNode(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() nodeIDA := storj.NodeID{183, 255} //[10110111, 1111111] - rt, cleanup := createRoutingTable(t, nodeIDA) - defer cleanup() + rt := createRoutingTable(nodeIDA) + defer ctx.Check(rt.Close) kadIDA := firstBucketID var kadIDB bucketID copy(kadIDB[:], kadIDA[:]) @@ -392,9 +422,11 @@ func TestKadBucketContainsLocalNode(t *testing.T) { } func TestKadBucketHasRoom(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() node1 := storj.NodeID{255, 255} - rt, cleanup := createRoutingTable(t, node1) - defer cleanup() + rt := createRoutingTable(node1) + defer ctx.Check(rt.Close) kadIDA := firstBucketID node2 := storj.NodeID{191, 255} node3 := storj.NodeID{127, 255} @@ -415,9 +447,11 @@ func TestKadBucketHasRoom(t *testing.T) { } func TestGetNodeIDsWithinKBucket(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() nodeIDA := storj.NodeID{183, 255} //[10110111, 1111111] - rt, cleanup := createRoutingTable(t, nodeIDA) - defer cleanup() + rt := createRoutingTable(nodeIDA) + defer ctx.Check(rt.Close) kadIDA := firstBucketID var kadIDB bucketID copy(kadIDB[:], kadIDA[:]) @@ -457,6 +491,8 @@ func TestGetNodeIDsWithinKBucket(t *testing.T) { } func TestGetNodesFromIDs(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() nodeA := teststorj.MockNode("AA") nodeB := teststorj.MockNode("BB") nodeC := teststorj.MockNode("CC") @@ -466,8 +502,8 @@ func TestGetNodesFromIDs(t *testing.T) { assert.NoError(t, err) c, err := proto.Marshal(nodeC) assert.NoError(t, err) - rt, cleanup := createRoutingTable(t, nodeA.Id) - defer cleanup() + rt := createRoutingTable(nodeA.Id) + defer ctx.Check(rt.Close) assert.NoError(t, rt.nodeBucketDB.Put(nodeA.Id.Bytes(), a)) assert.NoError(t, rt.nodeBucketDB.Put(nodeB.Id.Bytes(), b)) @@ -484,6 +520,8 @@ func TestGetNodesFromIDs(t *testing.T) { } func TestUnmarshalNodes(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() nodeA := teststorj.MockNode("AA") nodeB := teststorj.MockNode("BB") nodeC := teststorj.MockNode("CC") @@ -494,8 +532,8 @@ func TestUnmarshalNodes(t *testing.T) { assert.NoError(t, err) c, err := proto.Marshal(nodeC) assert.NoError(t, err) - rt, cleanup := createRoutingTable(t, nodeA.Id) - defer cleanup() + rt := createRoutingTable(nodeA.Id) + defer ctx.Check(rt.Close) assert.NoError(t, rt.nodeBucketDB.Put(nodeA.Id.Bytes(), a)) assert.NoError(t, rt.nodeBucketDB.Put(nodeB.Id.Bytes(), b)) assert.NoError(t, rt.nodeBucketDB.Put(nodeC.Id.Bytes(), c)) @@ -510,10 +548,12 @@ func TestUnmarshalNodes(t *testing.T) { } func TestGetUnmarshaledNodesFromBucket(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() nodeA := teststorj.MockNode("AA") - rt, cleanup := createRoutingTable(t, nodeA.Id) + rt := createRoutingTable(nodeA.Id) + defer ctx.Check(rt.Close) bucketID := firstBucketID - defer cleanup() nodeB := teststorj.MockNode("BB") nodeC := teststorj.MockNode("CC") var err error @@ -530,8 +570,10 @@ func TestGetUnmarshaledNodesFromBucket(t *testing.T) { } func TestGetKBucketRange(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) idA := storj.NodeID{255, 255} idB := storj.NodeID{127, 255} idC := storj.NodeID{63, 255} @@ -568,16 +610,16 @@ func TestGetKBucketRange(t *testing.T) { } func TestBucketIDZeroValue(t *testing.T) { - // rt, cleanup := createRoutingTable(t, storj.NodeID{}) - // defer cleanup() - zero := bucketID{} //rt.createZeroAsBucketID() + zero := bucketID{} expected := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} assert.True(t, bytes.Equal(zero[:], expected)) } func TestDetermineLeafDepth(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) idA, idB, idC := storj.NodeID(firstBucketID), storj.NodeID(firstBucketID), storj.NodeID(firstBucketID) idA[0] = 255 idB[0] = 127 @@ -635,8 +677,10 @@ func TestDetermineLeafDepth(t *testing.T) { } func TestSplitBucket(t *testing.T) { - rt, cleanup := createRoutingTable(t, storj.NodeID{}) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) cases := []struct { testID string idA []byte diff --git a/pkg/kademlia/routing_integration_helpers_test.go b/pkg/kademlia/routing_integration_helpers_test.go new file mode 100644 index 000000000..56a119183 --- /dev/null +++ b/pkg/kademlia/routing_integration_helpers_test.go @@ -0,0 +1,86 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package kademlia + +import ( + "encoding/hex" + "fmt" + "io" + "os" + "strings" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" +) + +func id(hexID string) (rv storj.NodeID) { + bytes, err := hex.DecodeString(hexID) + if err != nil { + panic(err) + } + if len(bytes) != len(storj.NodeID{}) { + panic(fmt.Sprintf("invalid length for %q", hexID)) + } + copy(rv[:], bytes) + if rv == (storj.NodeID{}) { + panic("to allow routing table implementations to use a node id zero value (unlikely to have a collision), tests shouldn't use it") + } + return rv +} + +func PadID(hexPrefix, hexPad string) storj.NodeID { + repeats := (len(storj.NodeID{})*2 - len(hexPrefix)) / len(hexPad) + return id(hexPrefix + strings.Repeat(hexPad, repeats)) +} + +func Node(id storj.NodeID, address string) *pb.Node { + return &pb.Node{ + Id: id, + Address: &pb.NodeAddress{ + Address: address, + }, + } +} + +var graphCounter = new(int64) + +type Grapher interface { + Graph(io.Writer) error +} + +func SaveGraph(table dht.RoutingTable) { + if table, ok := table.(Grapher); ok { + fh, err := os.Create(fmt.Sprintf("routing-graph-%003d.dot", atomic.AddInt64(graphCounter, 1))) + if err != nil { + panic(err) + } + defer func() { + err := fh.Close() + if err != nil { + panic(err) + } + }() + err = table.Graph(fh) + if err != nil { + panic(err) + } + } +} + +func requireNodesEqual(t testing.TB, expected []*pb.Node, actual []*pb.Node) { + require.Equal(t, len(expected), len(actual)) + for i, node := range expected { + require.Equal(t, node.Id, actual[i].Id) + require.Equal(t, node.Address.Transport, actual[i].Address.Transport) + require.Equal(t, node.Address.Address, actual[i].Address.Address) + } +} + +func NodeFromPrefix(prefix string, pad string) *pb.Node { + return Node(PadID(prefix, pad), fmt.Sprintf("address-%s:1", prefix)) +} diff --git a/pkg/kademlia/routing_integration_test.go b/pkg/kademlia/routing_integration_test.go new file mode 100644 index 000000000..1ee882ddd --- /dev/null +++ b/pkg/kademlia/routing_integration_test.go @@ -0,0 +1,615 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package kademlia + +import ( + "testing" + + "github.com/stretchr/testify/require" + "storj.io/storj/internal/testcontext" + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/kademlia/testrouting" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" +) + +type routingCtor func(storj.NodeID, int, int, int) dht.RoutingTable + +func newRouting(self storj.NodeID, bucketSize, cacheSize, allowedFailures int) dht.RoutingTable { + if allowedFailures != 0 { + panic("failure counting currently unsupported") + } + return createRoutingTableWith(self, routingTableOpts{ + bucketSize: bucketSize, + cacheSize: cacheSize, + }) +} + +func newTestRouting(self storj.NodeID, bucketSize, cacheSize, allowedFailures int) dht.RoutingTable { + return testrouting.New(self, bucketSize, cacheSize, allowedFailures) +} + +func TestTableInit_Routing(t *testing.T) { testTableInit(t, newRouting) } +func TestTableInit_TestRouting(t *testing.T) { testTableInit(t, newTestRouting) } +func testTableInit(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + bucketSize := 5 + cacheSize := 3 + table := routingCtor(PadID("55", "5"), bucketSize, cacheSize, 0) + defer ctx.Check(table.Close) + require.Equal(t, bucketSize, table.K()) + require.Equal(t, cacheSize, table.CacheSize()) + + nodes, err := table.FindNear(PadID("21", "0"), 3) + require.NoError(t, err) + require.Equal(t, 0, len(nodes)) +} + +func TestTableBasic_Routing(t *testing.T) { testTableBasic(t, newRouting) } +func TestTableBasic_TestRouting(t *testing.T) { testTableBasic(t, newTestRouting) } +func testTableBasic(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("5555", "5"), 5, 3, 0) + defer ctx.Check(table.Close) + + err := table.ConnectionSuccess(Node(PadID("5556", "5"), "address:1")) + require.NoError(t, err) + + nodes, err := table.FindNear(PadID("21", "0"), 3) + require.NoError(t, err) + require.Equal(t, 1, len(nodes)) + require.Equal(t, PadID("5556", "5"), nodes[0].Id) + require.Equal(t, "address:1", nodes[0].Address.Address) +} + +func TestNoSelf_Routing(t *testing.T) { testNoSelf(t, newRouting) } +func TestNoSelf_TestRouting(t *testing.T) { testNoSelf(t, newTestRouting) } +func testNoSelf(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("55", "5"), 5, 3, 0) + defer ctx.Check(table.Close) + err := table.ConnectionSuccess(Node(PadID("55", "5"), "address:2")) + require.NoError(t, err) + + nodes, err := table.FindNear(PadID("21", "0"), 3) + require.NoError(t, err) + require.Equal(t, 0, len(nodes)) +} + +func TestSplits_Routing(t *testing.T) { testSplits(t, newRouting) } +func TestSplits_TestRouting(t *testing.T) { testSplits(t, newTestRouting) } +func testSplits(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("55", "5"), 5, 2, 0) + defer ctx.Check(table.Close) + + for _, prefix2 := range "18" { + for _, prefix1 := range "a69c23f1d7eb5408" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "0"))) + } + } + + // we just put 32 nodes into the table. the bucket with a differing first + // bit should be full with 5 nodes. the bucket with the same first bit and + // differing second bit should be full with 5 nodes. the bucket with the + // same first two bits and differing third bit should not be full and have + // 4 nodes (60..., 68..., 70..., 78...). the bucket with the same first + // three bits should also not be full and have 4 nodes + // (40..., 48..., 50..., 58...). So we should be able to get no more than + // 18 nodes back + nodes, err := table.FindNear(PadID("55", "5"), 19) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + // bucket 010 (same first three bits) + NodeFromPrefix("51", "0"), NodeFromPrefix("58", "0"), + NodeFromPrefix("41", "0"), NodeFromPrefix("48", "0"), + + // bucket 011 (same first two bits) + NodeFromPrefix("71", "0"), NodeFromPrefix("78", "0"), + NodeFromPrefix("61", "0"), NodeFromPrefix("68", "0"), + + // bucket 00 (same first bit) + NodeFromPrefix("11", "0"), + NodeFromPrefix("01", "0"), + NodeFromPrefix("31", "0"), + // 20 is added first of this group, so it's the only one where there's + // room for the 28, before this bucket is full + NodeFromPrefix("21", "0"), NodeFromPrefix("28", "0"), + + // bucket 1 (differing first bit) + NodeFromPrefix("d1", "0"), + NodeFromPrefix("c1", "0"), + NodeFromPrefix("f1", "0"), + NodeFromPrefix("91", "0"), + NodeFromPrefix("a1", "0"), + // e and f were added last so that bucket should have been full by then + }, nodes) + + // let's cause some failures and make sure the replacement cache fills in + // the gaps + + // bucket 010 shouldn't have anything in its replacement cache + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("41", "0"))) + // bucket 011 shouldn't have anything in its replacement cache + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("68", "0"))) + + // bucket 00 should have two things in its replacement cache, 18... is one of them + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("18", "0"))) + + // now just one thing in its replacement cache + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("31", "0"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("28", "0"))) + + // bucket 1 should have two things in its replacement cache + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("a1", "0"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("d1", "0"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("91", "0"))) + + nodes, err = table.FindNear(PadID("55", "5"), 19) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + // bucket 010 + NodeFromPrefix("51", "0"), NodeFromPrefix("58", "0"), + NodeFromPrefix("48", "0"), + + // bucket 011 + NodeFromPrefix("71", "0"), NodeFromPrefix("78", "0"), + NodeFromPrefix("61", "0"), + + // bucket 00 + NodeFromPrefix("11", "0"), + NodeFromPrefix("01", "0"), + NodeFromPrefix("08", "0"), // replacement cache + NodeFromPrefix("21", "0"), + + // bucket 1 + NodeFromPrefix("c1", "0"), + NodeFromPrefix("f1", "0"), + NodeFromPrefix("88", "0"), // replacement cache + NodeFromPrefix("b8", "0"), // replacement cache + }, nodes) +} + +func TestUnbalanced_Routing(t *testing.T) { testUnbalanced(t, newRouting) } +func TestUnbalanced_TestRouting(t *testing.T) { testUnbalanced(t, newTestRouting) } +func testUnbalanced(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("ff", "f"), 5, 2, 0) + defer ctx.Check(table.Close) + + for _, prefix1 := range "0123456789abcdef" { + for _, prefix2 := range "18" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "0"))) + } + } + + // in this case, we've blown out the routing table with a paradoxical + // case. every node we added should have been the closest node, so this + // would have forced every bucket to split, and we should have stored all + // possible nodes. + + nodes, err := table.FindNear(PadID("ff", "f"), 33) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("f8", "0"), NodeFromPrefix("f1", "0"), + NodeFromPrefix("e8", "0"), NodeFromPrefix("e1", "0"), + NodeFromPrefix("d8", "0"), NodeFromPrefix("d1", "0"), + NodeFromPrefix("c8", "0"), NodeFromPrefix("c1", "0"), + NodeFromPrefix("b8", "0"), NodeFromPrefix("b1", "0"), + NodeFromPrefix("a8", "0"), NodeFromPrefix("a1", "0"), + NodeFromPrefix("98", "0"), NodeFromPrefix("91", "0"), + NodeFromPrefix("88", "0"), NodeFromPrefix("81", "0"), + NodeFromPrefix("78", "0"), NodeFromPrefix("71", "0"), + NodeFromPrefix("68", "0"), NodeFromPrefix("61", "0"), + NodeFromPrefix("58", "0"), NodeFromPrefix("51", "0"), + NodeFromPrefix("48", "0"), NodeFromPrefix("41", "0"), + NodeFromPrefix("38", "0"), NodeFromPrefix("31", "0"), + NodeFromPrefix("28", "0"), NodeFromPrefix("21", "0"), + NodeFromPrefix("18", "0"), NodeFromPrefix("11", "0"), + NodeFromPrefix("08", "0"), NodeFromPrefix("01", "0"), + }, nodes) +} + +func TestQuery_Routing(t *testing.T) { testQuery(t, newRouting) } +func TestQuery_TestRouting(t *testing.T) { testQuery(t, newTestRouting) } +func testQuery(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 5, 2, 0) + defer ctx.Check(table.Close) + + for _, prefix2 := range "18" { + for _, prefix1 := range "b4f25c896de03a71" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "f"))) + } + } + + nodes, err := table.FindNear(PadID("c7139", "1"), 2) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + NodeFromPrefix("d1", "f"), + }, nodes) + + nodes, err = table.FindNear(PadID("c7139", "1"), 7) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + NodeFromPrefix("d1", "f"), + NodeFromPrefix("e1", "f"), + NodeFromPrefix("f1", "f"), + NodeFromPrefix("f8", "f"), + NodeFromPrefix("81", "f"), + NodeFromPrefix("88", "f"), + }, nodes) + + nodes, err = table.FindNear(PadID("c7139", "1"), 10) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + NodeFromPrefix("d1", "f"), + NodeFromPrefix("e1", "f"), + NodeFromPrefix("f1", "f"), + NodeFromPrefix("f8", "f"), + NodeFromPrefix("81", "f"), + NodeFromPrefix("88", "f"), + NodeFromPrefix("91", "f"), + NodeFromPrefix("98", "f"), + NodeFromPrefix("a1", "f"), + }, nodes) +} + +func TestFailureCounting_Routing(t *testing.T) { t.Skip() } +func TestFailureCounting_TestRouting(t *testing.T) { testFailureCounting(t, newTestRouting) } +func testFailureCounting(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 5, 2, 2) + defer ctx.Check(table.Close) + + for _, prefix2 := range "18" { + for _, prefix1 := range "b4f25c896de03a71" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "f"))) + } + } + + nochange := func() { + nodes, err := table.FindNear(PadID("c7139", "1"), 7) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + NodeFromPrefix("d1", "f"), + NodeFromPrefix("e1", "f"), + NodeFromPrefix("f1", "f"), + NodeFromPrefix("f8", "f"), + NodeFromPrefix("81", "f"), + NodeFromPrefix("88", "f"), + }, nodes) + } + + nochange() + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("d1", "f"))) + nochange() + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("d1", "f"))) + nochange() + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("d1", "f"))) + + nodes, err := table.FindNear(PadID("c7139", "1"), 7) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + NodeFromPrefix("e1", "f"), + NodeFromPrefix("e8", "f"), + NodeFromPrefix("f1", "f"), + NodeFromPrefix("f8", "f"), + NodeFromPrefix("81", "f"), + NodeFromPrefix("88", "f"), + }, nodes) +} + +func TestUpdateBucket_Routing(t *testing.T) { testUpdateBucket(t, newRouting) } +func TestUpdateBucket_TestRouting(t *testing.T) { testUpdateBucket(t, newTestRouting) } +func testUpdateBucket(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 5, 2, 0) + defer ctx.Check(table.Close) + + for _, prefix2 := range "18" { + for _, prefix1 := range "b4f25c896de03a71" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "f"))) + } + } + + nodes, err := table.FindNear(PadID("c7139", "1"), 1) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("c1", "f"), + }, nodes) + + require.NoError(t, table.ConnectionSuccess( + Node(PadID("c1", "f"), "new-address:3"))) + + nodes, err = table.FindNear(PadID("c7139", "1"), 1) + require.NoError(t, err) + require.Equal(t, 1, len(nodes)) + require.Equal(t, PadID("c1", "f"), nodes[0].Id) + require.Equal(t, "new-address:3", nodes[0].Address.Address) +} + +func TestUpdateCache_Routing(t *testing.T) { testUpdateCache(t, newRouting) } +func TestUpdateCache_TestRouting(t *testing.T) { testUpdateCache(t, newTestRouting) } +func testUpdateCache(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 1, 1, 0) + defer ctx.Check(table.Close) + + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("81", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("c1", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("41", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("01", "0"))) + + require.NoError(t, table.ConnectionSuccess(Node(PadID("01", "0"), "new-address:6"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("41", "0"))) + + nodes, err := table.FindNear(PadID("01", "0"), 4) + require.NoError(t, err) + + requireNodesEqual(t, []*pb.Node{ + Node(PadID("01", "0"), "new-address:6"), + NodeFromPrefix("81", "0"), + NodeFromPrefix("c1", "0"), + }, nodes) +} + +func TestFailureUnknownAddress_Routing(t *testing.T) { testFailureUnknownAddress(t, newRouting) } +func TestFailureUnknownAddress_TestRouting(t *testing.T) { testFailureUnknownAddress(t, newTestRouting) } +func testFailureUnknownAddress(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 1, 1, 0) + defer ctx.Check(table.Close) + + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("81", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("c1", "0"))) + require.NoError(t, table.ConnectionSuccess(Node(PadID("41", "0"), "address:2"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("01", "0"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("41", "0"))) + + nodes, err := table.FindNear(PadID("01", "0"), 4) + require.NoError(t, err) + + requireNodesEqual(t, []*pb.Node{ + Node(PadID("41", "0"), "address:2"), + NodeFromPrefix("81", "0"), + NodeFromPrefix("c1", "0"), + }, nodes) +} + +func TestShrink_Routing(t *testing.T) { testShrink(t, newRouting) } +func TestShrink_TestRouting(t *testing.T) { testShrink(t, newTestRouting) } +func testShrink(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("ff", "f"), 2, 2, 0) + defer ctx.Check(table.Close) + + // blow out the routing table + for _, prefix1 := range "0123456789abcdef" { + for _, prefix2 := range "18" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "0"))) + } + } + + // delete some of the bad ones + for _, prefix1 := range "0123456789abcd" { + for _, prefix2 := range "18" { + require.NoError(t, table.ConnectionFailed( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "0"))) + } + } + + // add back some nodes more balanced + for _, prefix1 := range "3a50" { + for _, prefix2 := range "19" { + require.NoError(t, table.ConnectionSuccess( + NodeFromPrefix(string([]rune{prefix1, prefix2}), "0"))) + } + } + + // make sure table filled in alright + nodes, err := table.FindNear(PadID("ff", "f"), 13) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("f8", "0"), + NodeFromPrefix("f1", "0"), + NodeFromPrefix("e8", "0"), + NodeFromPrefix("e1", "0"), + NodeFromPrefix("a9", "0"), + NodeFromPrefix("a1", "0"), + NodeFromPrefix("59", "0"), + NodeFromPrefix("51", "0"), + NodeFromPrefix("39", "0"), + NodeFromPrefix("31", "0"), + NodeFromPrefix("09", "0"), + NodeFromPrefix("01", "0"), + }, nodes) +} + +func TestReplacementCacheOrder_Routing(t *testing.T) { testReplacementCacheOrder(t, newRouting) } +func TestReplacementCacheOrder_TestRouting(t *testing.T) { testReplacementCacheOrder(t, newTestRouting) } +func testReplacementCacheOrder(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("a3", "3"), 1, 2, 0) + defer ctx.Check(table.Close) + + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("81", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("21", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("c1", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("41", "0"))) + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix("01", "0"))) + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("21", "0"))) + + nodes, err := table.FindNear(PadID("55", "5"), 4) + require.NoError(t, err) + + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("01", "0"), + NodeFromPrefix("c1", "0"), + NodeFromPrefix("81", "0"), + }, nodes) +} + +func TestHealSplit_Routing(t *testing.T) { testHealSplit(t, newRouting) } +func TestHealSplit_TestRouting(t *testing.T) { testHealSplit(t, newTestRouting) } +func testHealSplit(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("55", "55"), 2, 2, 0) + defer ctx.Check(table.Close) + + for _, pad := range []string{"0", "1"} { + for _, prefix := range []string{"ff", "e1", "c1", "54", "56", "57"} { + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix(prefix, pad))) + } + } + + nodes, err := table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("c1", "0"), + NodeFromPrefix("ff", "0"), + NodeFromPrefix("e1", "0"), + }, nodes) + + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("c1", "0"))) + + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("ff", "0"), + NodeFromPrefix("e1", "0"), + }, nodes) + + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("ff", "0"))) + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("e1", "1"), + NodeFromPrefix("e1", "0"), + }, nodes) + + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("e1", "0"))) + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("ff", "1"), + NodeFromPrefix("e1", "1"), + }, nodes) + + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("e1", "1"))) + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("ff", "1"), + }, nodes) + + for _, prefix := range []string{"ff", "e1", "c1", "54", "56", "57"} { + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix(prefix, "2"))) + } + + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("54", "1"), + NodeFromPrefix("54", "0"), + NodeFromPrefix("57", "0"), + NodeFromPrefix("56", "0"), + NodeFromPrefix("c1", "1"), + NodeFromPrefix("c1", "2"), + NodeFromPrefix("ff", "1"), + NodeFromPrefix("ff", "2"), + }, nodes) +} + +func TestFullDissimilarBucket_Routing(t *testing.T) { testFullDissimilarBucket(t, newRouting) } +func TestFullDissimilarBucket_TestRouting(t *testing.T) { testFullDissimilarBucket(t, newTestRouting) } +func testFullDissimilarBucket(t *testing.T, routingCtor routingCtor) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + table := routingCtor(PadID("55", "55"), 2, 2, 0) + defer ctx.Check(table.Close) + + for _, prefix := range []string{"d1", "c1", "f1", "e1"} { + require.NoError(t, table.ConnectionSuccess(NodeFromPrefix(prefix, "0"))) + } + + nodes, err := table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("d1", "0"), + NodeFromPrefix("c1", "0"), + }, nodes) + + require.NoError(t, table.ConnectionFailed(NodeFromPrefix("c1", "0"))) + + nodes, err = table.FindNear(PadID("55", "55"), 9) + require.NoError(t, err) + requireNodesEqual(t, []*pb.Node{ + NodeFromPrefix("d1", "0"), + NodeFromPrefix("e1", "0"), + }, nodes) +} diff --git a/pkg/kademlia/routing_test.go b/pkg/kademlia/routing_test.go index 1705c45ce..61f4a734d 100644 --- a/pkg/kademlia/routing_test.go +++ b/pkg/kademlia/routing_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "storj.io/storj/internal/testcontext" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" @@ -21,30 +22,42 @@ import ( ) func TestLocal(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) assert.Equal(t, rt.Local().Id.Bytes()[:2], []byte("AA")) } func TestK(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) k := rt.K() assert.Equal(t, rt.bucketSize, k) } func TestCacheSize(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) expected := rt.rcBucketSize result := rt.CacheSize() assert.Equal(t, expected, result) } func TestGetBucket(t *testing.T) { - rt, cleanup := createRoutingTable(t, teststorj.NodeIDFromString("AA")) - defer cleanup() + ctx := testcontext.New(t) + defer ctx.Cleanup() + + rt := createRoutingTable(teststorj.NodeIDFromString("AA")) + defer ctx.Check(rt.Close) node := teststorj.MockNode("AA") node2 := teststorj.MockNode("BB") ok, err := rt.addNode(node2) @@ -86,8 +99,7 @@ func RandomNode() pb.Node { func TestKademliaFindNear(t *testing.T) { testFunc := func(t *testing.T, testNodeCount, limit int) { selfNode := RandomNode() - rt, cleanup := createRoutingTable(t, selfNode.Id) - defer cleanup() + rt := createRoutingTable(selfNode.Id) expectedIDs := make([]storj.NodeID, 0) for x := 0; x < testNodeCount; x++ { @@ -124,9 +136,12 @@ func TestKademliaFindNear(t *testing.T) { } func TestConnectionSuccess(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + id := teststorj.NodeIDFromString("AA") - rt, cleanup := createRoutingTable(t, id) - defer cleanup() + rt := createRoutingTable(id) + defer ctx.Check(rt.Close) id2 := teststorj.NodeIDFromString("BB") address1 := &pb.NodeAddress{Address: "a"} address2 := &pb.NodeAddress{Address: "b"} @@ -163,9 +178,12 @@ func TestConnectionSuccess(t *testing.T) { } func TestUpdateSelf(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + id := teststorj.NodeIDFromString("AA") - rt, cleanup := createRoutingTable(t, id) - defer cleanup() + rt := createRoutingTable(id) + defer ctx.Check(rt.Close) address := &pb.NodeAddress{Address: "a"} node := &pb.Node{Id: id, Address: address, Type: pb.NodeType_STORAGE} cases := []struct { @@ -200,10 +218,13 @@ func TestUpdateSelf(t *testing.T) { } func TestConnectionFailed(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + id := teststorj.NodeIDFromString("AA") node := &pb.Node{Id: id, Type: pb.NodeType_STORAGE} - rt, cleanup := createRoutingTable(t, id) - defer cleanup() + rt := createRoutingTable(id) + defer ctx.Check(rt.Close) err := rt.ConnectionFailed(node) assert.NoError(t, err) v, err := rt.nodeBucketDB.Get(id.Bytes()) @@ -212,9 +233,12 @@ func TestConnectionFailed(t *testing.T) { } func TestSetBucketTimestamp(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + id := teststorj.NodeIDFromString("AA") - rt, cleanup := createRoutingTable(t, id) - defer cleanup() + rt := createRoutingTable(id) + defer ctx.Check(rt.Close) now := time.Now().UTC() err := rt.createOrUpdateKBucket(keyToBucketID(id.Bytes()), now) @@ -231,9 +255,12 @@ func TestSetBucketTimestamp(t *testing.T) { } func TestGetBucketTimestamp(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + id := teststorj.NodeIDFromString("AA") - rt, cleanup := createRoutingTable(t, id) - defer cleanup() + rt := createRoutingTable(id) + defer ctx.Check(rt.Close) now := time.Now().UTC() err := rt.createOrUpdateKBucket(keyToBucketID(id.Bytes()), now) assert.NoError(t, err) diff --git a/pkg/kademlia/testrouting/testrouting.go b/pkg/kademlia/testrouting/testrouting.go new file mode 100644 index 000000000..9ea02b7a4 --- /dev/null +++ b/pkg/kademlia/testrouting/testrouting.go @@ -0,0 +1,308 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package testrouting + +import ( + "sort" + "sync" + "time" + + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" + "storj.io/storj/storage" +) + +type nodeData struct { + node *pb.Node + ordering int64 + lastUpdated time.Time + fails int + inCache bool +} + +// Table is a routing table that tries to be as correct as possible at +// the expense of performance. +type Table struct { + self storj.NodeID + bucketSize int + cacheSize int + allowedFailures int + + mu sync.Mutex + counter int64 + nodes map[storj.NodeID]*nodeData + splits map[string]bool +} + +// New creates a new Table. self is the owning node's node id, bucketSize is +// the kademlia k value, cacheSize is the size of each bucket's replacement +// cache, and allowedFailures is the number of failures on a given node before +// the node is removed from the table. +func New(self storj.NodeID, bucketSize, cacheSize, allowedFailures int) *Table { + return &Table{ + self: self, + bucketSize: bucketSize, + cacheSize: cacheSize, + allowedFailures: allowedFailures, + nodes: map[storj.NodeID]*nodeData{}, + splits: map[string]bool{}, + } +} + +// make sure the Table implements the right interface +var _ dht.RoutingTable = (*Table)(nil) + +// K returns the Table's routing depth, or Kademlia k value +func (t *Table) K() int { return t.bucketSize } + +// CacheSize returns the size of replacement cache +func (t *Table) CacheSize() int { return t.cacheSize } + +// ConnectionSuccess should be called whenever a node is successfully connected +// to. It will add or update the node's entry in the routing table. +func (t *Table) ConnectionSuccess(node *pb.Node) error { + t.mu.Lock() + defer t.mu.Unlock() + + // don't add ourselves + if node.Id == t.self { + return nil + } + + // if the node is already here, update it + if cell, exists := t.nodes[node.Id]; exists { + cell.node = node + cell.lastUpdated = time.Now() + cell.fails = 0 + // skip placement order and cache status + return nil + } + + // add unconditionally (it might be going into a replacement cache) + t.nodes[node.Id] = &nodeData{ + node: node, + ordering: t.counter, + lastUpdated: time.Now(), + fails: 0, + + // makeTree within preserveInvariants might promote this to true + inCache: false, + } + t.counter++ + + t.preserveInvariants() + return nil +} + +// ConnectionFailed should be called whenever a node can't be contacted. +// If a node fails more than allowedFailures times, it will be removed from +// the routing table. The failure count is reset every successful connection. +func (t *Table) ConnectionFailed(node *pb.Node) error { + t.mu.Lock() + defer t.mu.Unlock() + + // if the node exists and the failure is with the address we have, record + // a failure + + if data, exists := t.nodes[node.Id]; exists && + pb.AddressEqual(data.node.Address, node.Address) { + data.fails++ //TODO: we may not need this + // if we've failed too many times, remove the node + if data.fails > t.allowedFailures { + delete(t.nodes, node.Id) + + t.preserveInvariants() + } + } + return nil +} + +// FindNear will return up to limit nodes in the routing table ordered by +// kademlia xor distance from the given id. +func (t *Table) FindNear(id storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) { + t.mu.Lock() + defer t.mu.Unlock() + + // find all non-cache nodes + nodes := make([]*nodeData, 0, len(t.nodes)) + for _, node := range t.nodes { + if !node.inCache { + nodes = append(nodes, node) + } + } + + // sort by distance + sort.Sort(nodeDataDistanceSorter{self: id, nodes: nodes}) + + // return up to limit nodes + if limit > len(nodes) { + limit = len(nodes) + } + rv := make([]*pb.Node, 0, limit) + for _, data := range nodes[:limit] { + rv = append(rv, data.node) + } + return rv, nil +} + +// Local returns the local nodes ID +func (t *Table) Local() pb.Node { + // the routing table has no idea what the right address of ourself is, + // so this is the wrong place to get this information. we could return + // our own id only? + panic("Unimplementable") +} + +// Self returns the node's configured node id. +func (t *Table) Self() storj.NodeID { return t.self } + +// MaxBucketDepth returns the largest depth of the routing table tree. This +// is useful for determining which buckets should be refreshed. +func (t *Table) MaxBucketDepth() (int, error) { + t.mu.Lock() + defer t.mu.Unlock() + + var maxDepth int + t.walkLeaves(t.makeTree(), func(b *bucket) { + if b.depth > maxDepth { + maxDepth = b.depth + } + }) + return maxDepth, nil +} + +// GetNodes retrieves nodes within the same kbucket as the given node id +func (t *Table) GetNodes(id storj.NodeID) (nodes []*pb.Node, ok bool) { + panic("TODO") +} + +// GetBucketIds returns a storage.Keys type of bucket ID's in the Kademlia instance +func (t *Table) GetBucketIds() (storage.Keys, error) { + panic("TODO") +} + +// SetBucketTimestamp records the time of the last node lookup for a bucket +func (t *Table) SetBucketTimestamp(id []byte, now time.Time) error { + panic("TODO") +} + +// GetBucketTimestamp retrieves time of the last node lookup for a bucket +func (t *Table) GetBucketTimestamp(id []byte) (time.Time, error) { + panic("TODO") +} + +func (t *Table) preserveInvariants() { + t.walkLeaves(t.makeTree(), func(b *bucket) { + // pull the latest nodes out of the replacement caches for incomplete + // buckets + for len(b.cache) > 0 && len(b.nodes) < t.bucketSize { + recentNode := b.cache[len(b.cache)-1] + recentNode.inCache = false + b.cache = b.cache[:len(b.cache)-1] + b.nodes = append(b.nodes, recentNode) + } + + // prune remaining replacement cache entries + if len(b.cache) > t.cacheSize { + for _, node := range b.cache[:len(b.cache)-t.cacheSize] { + delete(t.nodes, node.node.Id) + } + } + }) +} + +type bucket struct { + prefix string + depth int + + similar *bucket + dissimilar *bucket + + nodes []*nodeData + cache []*nodeData +} + +func (t *Table) walkLeaves(b *bucket, fn func(b *bucket)) { + if !t.splits[b.prefix] { + fn(b) + } else if b.similar != nil { + t.walkLeaves(b.similar, fn) + t.walkLeaves(b.dissimilar, fn) + } +} + +func (t *Table) makeTree() *bucket { + // to make sure we get the logic right, we're going to reconstruct the + // routing table binary tree data structure every time. + nodes := make([]*nodeData, 0, len(t.nodes)) + for _, node := range t.nodes { + nodes = append(nodes, node) + } + var root bucket + + // we'll replay the nodes in original placement order + sort.Slice(nodes, func(i, j int) bool { + return nodes[i].ordering < nodes[j].ordering + }) + nearest := make([]*nodeData, 0, t.bucketSize+1) + for _, node := range nodes { + // keep track of the nearest k nodes + nearest = append(nearest, node) + sort.Sort(nodeDataDistanceSorter{self: t.self, nodes: nearest}) + if len(nearest) > t.bucketSize { + nearest = nearest[:t.bucketSize] + } + + t.add(&root, node, false, nearest) + } + return &root +} + +func (t *Table) add(b *bucket, node *nodeData, dissimilar bool, nearest []*nodeData) { + if t.splits[b.prefix] { + if b.similar == nil { + similarBit := bitAtDepth(t.self, b.depth) + b.similar = &bucket{depth: b.depth + 1, prefix: extendPrefix(b.prefix, similarBit)} + b.dissimilar = &bucket{depth: b.depth + 1, prefix: extendPrefix(b.prefix, !similarBit)} + } + if bitAtDepth(node.node.Id, b.depth) == bitAtDepth(t.self, b.depth) { + t.add(b.similar, node, dissimilar, nearest) + } else { + t.add(b.dissimilar, node, true, nearest) + } + return + } + + if node.inCache { + b.cache = append(b.cache, node) + return + } + + if len(b.nodes) < t.bucketSize { + node.inCache = false + b.nodes = append(b.nodes, node) + return + } + + if dissimilar && !isNearest(node.node.Id, nearest) { + node.inCache = true + b.cache = append(b.cache, node) + return + } + + t.splits[b.prefix] = true + if len(b.cache) > 0 { + panic("unreachable codepath") + } + nodes := b.nodes + b.nodes = nil + for _, existingNode := range nodes { + t.add(b, existingNode, dissimilar, nearest) + } + t.add(b, node, dissimilar, nearest) +} + +// Close closes without closing dependencies +func (t *Table) Close() error { return nil } diff --git a/pkg/kademlia/testrouting/utils.go b/pkg/kademlia/testrouting/utils.go new file mode 100644 index 000000000..1b5b8d9d6 --- /dev/null +++ b/pkg/kademlia/testrouting/utils.go @@ -0,0 +1,65 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package testrouting + +import ( + "storj.io/storj/pkg/storj" +) + +type nodeDataDistanceSorter struct { + self storj.NodeID + nodes []*nodeData +} + +func (s nodeDataDistanceSorter) Len() int { return len(s.nodes) } + +func (s nodeDataDistanceSorter) Swap(i, j int) { + s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i] +} + +func (s nodeDataDistanceSorter) Less(i, j int) bool { + return compareByXor(s.nodes[i].node.Id, s.nodes[j].node.Id, s.self) < 0 +} + +func compareByXor(left, right, reference storj.NodeID) int { + for i, r := range reference { + a, b := left[i]^r, right[i]^r + if a != b { + if a < b { + return -1 + } + return 1 + } + } + return 0 +} + +func bitAtDepth(id storj.NodeID, bitDepth int) bool { + // we could make this a fun one-liner but this is more understandable + byteDepth := bitDepth / 8 + bitOffset := bitDepth % 8 + power := uint(7 - bitOffset) + bitMask := byte(1 << power) + b := id[byteDepth] + if b&bitMask > 0 { + return true + } + return false +} + +func extendPrefix(prefix string, bit bool) string { + if bit { + return prefix + "1" + } + return prefix + "0" +} + +func isNearest(id storj.NodeID, nearest []*nodeData) bool { + for _, near := range nearest { + if near.node.Id == id { + return true + } + } + return false +} diff --git a/pkg/kademlia/testrouting/viz.go b/pkg/kademlia/testrouting/viz.go new file mode 100644 index 000000000..e65b807b5 --- /dev/null +++ b/pkg/kademlia/testrouting/viz.go @@ -0,0 +1,48 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package testrouting + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" +) + +// Graph writes a DOT format visual graph description of the routing table to w +func (t *Table) Graph(w io.Writer) error { + t.mu.Lock() + defer t.mu.Unlock() + + var buf bytes.Buffer + buf.Write([]byte("digraph{node [shape=box];")) + t.graph(&buf, t.makeTree()) + buf.Write([]byte("}\n")) + + _, err := buf.WriteTo(w) + return err +} + +func (t *Table) graph(buf *bytes.Buffer, b *bucket) { + if t.splits[b.prefix] { + fmt.Fprintf(buf, "b%s [label=%q];", b.prefix, b.prefix) + if b.similar != nil { + t.graph(buf, b.similar) + t.graph(buf, b.dissimilar) + fmt.Fprintf(buf, "b%s -> {b%s, b%s};", + b.prefix, b.similar.prefix, b.dissimilar.prefix) + } + return + } + // b.prefix is only ever 0s or 1s, so we don't need escaping below. + fmt.Fprintf(buf, "b%s [label=\"%s\nrouting:\\l", b.prefix, b.prefix) + for _, node := range b.nodes { + fmt.Fprintf(buf, " %s\\l", hex.EncodeToString(node.node.Id[:])) + } + fmt.Fprintf(buf, "cache:\\l") + for _, node := range b.cache { + fmt.Fprintf(buf, " %s\\l", hex.EncodeToString(node.node.Id[:])) + } + fmt.Fprintf(buf, "\"];") +} diff --git a/pkg/pb/utils.go b/pkg/pb/utils.go index 0f6746cd4..1df47ddbe 100644 --- a/pkg/pb/utils.go +++ b/pkg/pb/utils.go @@ -80,3 +80,15 @@ func (nt NodeType) DPanicOnInvalid(from string) { zap.L().DPanic("INVALID NODE TYPE: " + from) } } + +// AddressEqual compares two node addresses +func AddressEqual(a1, a2 *NodeAddress) bool { + if a1 == nil && a2 == nil { + return true + } + if a1 == nil || a2 == nil { + return false + } + return a1.Transport == a2.Transport && + a1.Address == a2.Address +}