storj/pkg/kademlia/kademlia_test.go
2019-04-09 13:01:45 -04:00

549 lines
14 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"bytes"
"context"
"math/rand"
"net"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/storage/teststore"
)
const (
defaultAlpha = 5
)
func TestNewKademlia(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
cases := []struct {
id *identity.FullIdentity
bn []pb.Node
addr string
expectedErr error
}{
{
id: func() *identity.FullIdentity {
id, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
return id
}(),
bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}},
addr: "127.0.0.1:8080",
},
{
id: func() *identity.FullIdentity {
id, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
return id
}(),
bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}},
addr: "127.0.0.1:8080",
},
}
for i, v := range cases {
kad, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, ctx.Dir(strconv.Itoa(i)), defaultAlpha)
require.NoError(t, err)
assert.Equal(t, v.expectedErr, err)
assert.Equal(t, kad.bootstrapNodes, v.bn)
assert.NotNil(t, kad.dialer)
assert.NotNil(t, kad.routingTable)
assert.NoError(t, kad.Close())
}
}
func TestPeerDiscovery(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// make new identity
bootServer, mockBootServer, bootID, bootAddress := startTestNodeServer(ctx)
defer bootServer.GracefulStop()
testServer, _, testID, testAddress := startTestNodeServer(ctx)
defer testServer.GracefulStop()
targetServer, _, targetID, targetAddress := startTestNodeServer(ctx)
defer targetServer.GracefulStop()
bootstrapNodes := []pb.Node{{Id: bootID.ID, Address: &pb.NodeAddress{Address: bootAddress}, Type: pb.NodeType_STORAGE}}
metadata := &pb.NodeMetadata{
Wallet: "OperatorWallet",
}
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, ctx.Dir("test"), defaultAlpha)
require.NoError(t, err)
rt := k.routingTable
assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet")
defer ctx.Check(k.Close)
cases := []struct {
target storj.NodeID
expected *pb.Node
expectedErr error
}{
{target: func() storj.NodeID {
mockBootServer.returnValue = []*pb.Node{{Id: targetID.ID, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: targetAddress}}}
return targetID.ID
}(),
expected: &pb.Node{},
expectedErr: nil,
},
{target: bootID.ID,
expected: nil,
expectedErr: nil,
},
}
for _, v := range cases {
_, err := k.lookup(ctx, v.target, true)
assert.Equal(t, v.expectedErr, err)
}
}
func TestBootstrap(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
bn, s, clean := testNode(ctx, "1", t, []pb.Node{})
defer clean()
defer s.GracefulStop()
n1, s1, clean1 := testNode(ctx, "2", t, []pb.Node{bn.routingTable.self})
defer clean1()
defer s1.GracefulStop()
err := n1.Bootstrap(ctx)
require.NoError(t, err)
n2, s2, clean2 := testNode(ctx, "3", t, []pb.Node{bn.routingTable.self})
defer clean2()
defer s2.GracefulStop()
err = n2.Bootstrap(ctx)
require.NoError(t, err)
nodeIDs, err := n2.routingTable.nodeBucketDB.List(nil, 0)
require.NoError(t, err)
assert.Len(t, nodeIDs, 3)
}
func testNode(ctx *testcontext.Context, name string, t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
// new address
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
// new config
// new identity
fid, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
// new kademlia
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, ctx.Dir(name), defaultAlpha)
require.NoError(t, err)
s := NewEndpoint(logger, k, k.routingTable)
// new ident opts
serverOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{
PeerIDVersions: "*",
})
require.NoError(t, err)
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
pb.RegisterNodesServer(grpcServer, s)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return k, grpcServer, func() {
assert.NoError(t, k.Close())
}
}
func TestRefresh(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
k, s, clean := testNode(ctx, "refresh", t, []pb.Node{})
defer clean()
defer s.GracefulStop()
//turn back time for only bucket
rt := k.routingTable
now := time.Now().UTC()
bID := firstBucketID //always exists
err := rt.SetBucketTimestamp(bID[:], now.Add(-2*time.Hour))
require.NoError(t, err)
//refresh should call FindNode, updating the time
err = k.refresh(ctx, time.Minute)
require.NoError(t, err)
ts1, err := rt.GetBucketTimestamp(bID[:])
require.NoError(t, err)
assert.True(t, now.Add(-5*time.Minute).Before(ts1))
//refresh should not call FindNode, leaving the previous time
err = k.refresh(ctx, time.Minute)
require.NoError(t, err)
ts2, err := rt.GetBucketTimestamp(bID[:])
require.NoError(t, err)
assert.True(t, ts1.Equal(ts2))
s.GracefulStop()
}
func TestFindNear(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// make new identity
fid, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
fid2, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
assert.NotEqual(t, fid.ID, fid2.ID)
//start kademlia
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, _ := newTestServer(ctx, []*pb.Node{{Id: teststorj.NodeIDFromString("foo")}})
defer srv.Stop()
ctx.Go(func() error {
err := srv.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
bootstrap := []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrap,
lis.Addr().String(), nil, fid, ctx.Dir("kademlia"), defaultAlpha)
require.NoError(t, err)
defer ctx.Check(k.Close)
// add nodes
nodes := []*pb.Node{}
newNode := func(id string, bw, disk int64) pb.Node {
nodeID := teststorj.NodeIDFromString(id)
restriction := &pb.NodeRestrictions{FreeBandwidth: bw, FreeDisk: disk}
n := &pb.Node{Id: nodeID, Restrictions: restriction, Type: pb.NodeType_STORAGE}
nodes = append(nodes, n)
err = k.routingTable.ConnectionSuccess(n)
require.NoError(t, err)
return *n
}
nodeIDA := newNode("AAAAA", 1, 4)
nodeIDB := newNode("BBBBB", 2, 3)
newNode("CCCCC", 3, 2)
newNode("DDDDD", 4, 1)
require.Len(t, nodes, 4)
cases := []struct {
testID string
target storj.NodeID
limit int
restrictions []pb.Restriction
expected []*pb.Node
}{
{testID: "one", target: nodeIDB.Id, limit: 2, expected: nodes[2:],
restrictions: []pb.Restriction{
{Operator: pb.Restriction_GT, Operand: pb.Restriction_FREE_BANDWIDTH, Value: int64(2)},
},
},
{testID: "two", target: nodeIDA.Id, limit: 3, expected: nodes[3:],
restrictions: []pb.Restriction{
{Operator: pb.Restriction_GT, Operand: pb.Restriction_FREE_BANDWIDTH, Value: int64(2)},
{Operator: pb.Restriction_LT, Operand: pb.Restriction_FREE_DISK, Value: int64(2)},
},
},
{testID: "three", target: nodeIDA.Id, limit: 4, expected: nodes, restrictions: []pb.Restriction{}},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
ns, err := k.FindNear(ctx, c.target, c.limit, c.restrictions...)
require.NoError(t, err)
assert.Equal(t, len(c.expected), len(ns))
for _, e := range c.expected {
found := false
for _, n := range ns {
if e.Id == n.Id {
found = true
}
}
assert.True(t, found, e.String())
}
})
}
}
func TestMeetsRestrictions(t *testing.T) {
cases := []struct {
testID string
r []pb.Restriction
n pb.Node
expect bool
}{
{testID: "pass one",
r: []pb.Restriction{
{
Operator: pb.Restriction_EQ,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(1),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
},
},
expect: true,
},
{testID: "pass multiple",
r: []pb.Restriction{
{
Operator: pb.Restriction_LTE,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GTE,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
FreeDisk: int64(3),
},
},
expect: true,
},
{testID: "fail one",
r: []pb.Restriction{
{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(3),
},
},
expect: false,
},
{testID: "fail multiple",
r: []pb.Restriction{
{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(2),
},
},
expect: false,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
result := meetsRestrictions(c.r, c.n)
assert.Equal(t, c.expect, result)
})
}
}
func startTestNodeServer(ctx *testcontext.Context) (*grpc.Server, *mockNodesServer, *identity.FullIdentity, string) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, ""
}
ca, err := testidentity.NewTestCA(ctx)
if err != nil {
return nil, nil, nil, ""
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil, nil, ""
}
serverOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
if err != nil {
return nil, nil, nil, ""
}
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return grpcServer, mn, identity, lis.Addr().String()
}
func newTestServer(ctx *testcontext.Context, nn []*pb.Node) (*grpc.Server, *mockNodesServer) {
ca, err := testidentity.NewTestCA(ctx)
if err != nil {
return nil, nil
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil
}
serverOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
if err != nil {
return nil, nil
}
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
return grpcServer, mn
}
// TestRandomIds makes sure finds a random node ID is within a range (start..end]
func TestRandomIds(t *testing.T) {
for x := 0; x < 1000; x++ {
var start, end bucketID
// many valid options
rand.Read(start[:])
rand.Read(end[:])
if bytes.Compare(start[:], end[:]) > 0 {
start, end = end, start
}
id, err := randomIDInRange(start, end)
require.NoError(t, err, "Unexpected err in randomIDInRange")
assert.True(t, bytes.Compare(id[:], start[:]) > 0, "Random id was less than starting id")
assert.True(t, bytes.Compare(id[:], end[:]) <= 0, "Random id was greater than end id")
//invalid range
_, err = randomIDInRange(end, start)
assert.Error(t, err, "Missing expected err in invalid randomIDInRange")
//no valid options
end = start
_, err = randomIDInRange(start, end)
assert.Error(t, err, "Missing expected err in empty randomIDInRange")
// one valid option
if start[31] == 255 {
start[31] = 254
} else {
end[31] = start[31] + 1
}
id, err = randomIDInRange(start, end)
require.NoError(t, err, "Unexpected err in randomIDInRange")
assert.True(t, bytes.Equal(id[:], end[:]), "Not-so-random id was incorrect")
}
}
type mockNodesServer struct {
queryCalled int32
pingCalled int32
infoCalled int32
returnValue []*pb.Node
}
func (mn *mockNodesServer) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
atomic.AddInt32(&mn.queryCalled, 1)
return &pb.QueryResponse{Response: mn.returnValue}, nil
}
func (mn *mockNodesServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
atomic.AddInt32(&mn.pingCalled, 1)
return &pb.PingResponse{}, nil
}
func (mn *mockNodesServer) RequestInfo(ctx context.Context, req *pb.InfoRequest) (*pb.InfoResponse, error) {
atomic.AddInt32(&mn.infoCalled, 1)
return &pb.InfoResponse{}, nil
}
// newKademlia returns a newly configured Kademlia instance
func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *identity.FullIdentity, path string, alpha int) (*Kademlia, error) {
self := pb.Node{
Id: identity.ID,
Type: nodeType,
Address: &pb.NodeAddress{Address: address},
Metadata: metadata,
}
rt, err := NewRoutingTable(log, self, teststore.New(), teststore.New(), nil)
if err != nil {
return nil, err
}
tlsOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{
PeerIDVersions: "*",
})
if err != nil {
return nil, err
}
transportClient := transport.NewClient(tlsOptions, rt)
kadConfig := Config{
Alpha: alpha,
}
kad, err := NewService(log, self, transportClient, rt, kadConfig)
if err != nil {
return nil, err
}
kad.bootstrapNodes = bootstrapNodes
return kad, nil
}