pkg/kademlia: make tests run/work with drpc

Change-Id: I69372fd8f0d52913e1ad2cf7d01115460ba8eeda
This commit is contained in:
Jeff Wendling 2019-10-03 15:31:47 -06:00
parent b2e328f118
commit c9e0aa7c70
4 changed files with 237 additions and 133 deletions

View File

@ -143,7 +143,7 @@ func (k *Kademlia) Bootstrap(ctx context.Context) (err error) {
ident, err := k.dialer.FetchPeerIdentityUnverified(ctx, node.Address.Address)
if err != nil {
errGroup.Add(BootstrapErr.Wrap(BootstrapErr.New("%s : %s unable to fetch unverified peer identity node address %s: %s", k.routingTable.self.Type.String(), k.routingTable.self.Id.String(), node.Address.Address, err)))
errGroup.Add(BootstrapErr.New("%s : %s unable to fetch unverified peer identity node address %s: %s", k.routingTable.self.Type.String(), k.routingTable.self.Id.String(), node.Address.Address, err))
continue
}

View File

@ -0,0 +1,107 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
// +build drpc
package kademlia
import (
"context"
"crypto/tls"
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/drpc/drpcserver"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/listenmux"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
)
func newListener(t *testing.T, ctx *testcontext.Context, addr string) (net.Listener, func()) {
lis, err := net.Listen("tcp", addr)
require.NoError(t, err)
listenCtx, cancel := context.WithCancel(ctx)
mux := listenmux.New(lis, 8)
ctx.Go(func() error { return mux.Run(listenCtx) })
return mux.Route("DRPC!!!1"), cancel
}
func testNode(t *testing.T, ctx *testcontext.Context, name string, bn []pb.Node) (*Kademlia, func()) {
lis, lisCancel := newListener(t, ctx, "127.0.0.1:0")
fid, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), pb.NodeOperator{}, fid, defaultAlpha)
require.NoError(t, err)
s := NewEndpoint(logger, k, nil, k.routingTable, nil)
tlsOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{PeerIDVersions: "*"}, nil)
require.NoError(t, err)
tlsLis := tls.NewListener(lis, tlsOptions.ServerTLSConfig())
drpcServer := drpcserver.New()
pb.DRPCRegisterNodes(drpcServer, s)
serveCtx, cancel := context.WithCancel(ctx)
ctx.Go(func() error { return drpcServer.Serve(serveCtx, tlsLis) })
return k, func() {
cancel()
lisCancel()
assert.NoError(t, k.Close())
}
}
func startTestNodeServer(t *testing.T, ctx *testcontext.Context) (*mockNodesServer, *identity.FullIdentity, string, func()) {
lis, lisCancel := newListener(t, ctx, "127.0.0.1:0")
ca, err := testidentity.NewTestCA(ctx)
require.NoError(t, err)
fullIdentity, err := ca.NewIdentity()
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
require.NoError(t, err)
tlsLis := tls.NewListener(lis, tlsOptions.ServerTLSConfig())
drpcServer := drpcserver.New()
mn := &mockNodesServer{queryCalled: 0}
pb.DRPCRegisterNodes(drpcServer, mn)
serveCtx, cancel := context.WithCancel(context.Background())
ctx.Go(func() error { return drpcServer.Serve(serveCtx, tlsLis) })
return mn, fullIdentity, lis.Addr().String(), func() {
cancel()
lisCancel()
}
}
func newTestServer(t *testing.T, ctx *testcontext.Context, lis net.Listener) (*mockNodesServer, func()) {
ca, err := testidentity.NewTestCA(ctx)
require.NoError(t, err)
fullIdentity, err := ca.NewIdentity()
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
require.NoError(t, err)
tlsLis := tls.NewListener(lis, tlsOptions.ServerTLSConfig())
drpcServer := drpcserver.New()
mn := &mockNodesServer{queryCalled: 0}
pb.DRPCRegisterNodes(drpcServer, mn)
serveCtx, cancel := context.WithCancel(context.Background())
ctx.Go(func() error { return drpcServer.Serve(serveCtx, tlsLis) })
return mn, cancel
}

View File

@ -0,0 +1,112 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
// +build !drpc
package kademlia
import (
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
)
func newListener(t *testing.T, ctx *testcontext.Context, addr string) (net.Listener, func()) {
lis, err := net.Listen("tcp", addr)
require.NoError(t, err)
return lis, func() { _ = lis.Close() }
}
func testNode(t *testing.T, ctx *testcontext.Context, name string, bn []pb.Node) (*Kademlia, func()) {
lis, lisCancel := newListener(t, ctx, "127.0.0.1:0")
fid, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{PeerIDVersions: "*"}, nil)
require.NoError(t, err)
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), pb.NodeOperator{}, fid, defaultAlpha)
require.NoError(t, err)
s := NewEndpoint(logger, k, nil, k.routingTable, nil)
grpcServer := grpc.NewServer(tlsOptions.ServerOption())
pb.RegisterNodesServer(grpcServer, s)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return k, func() {
grpcServer.GracefulStop()
lisCancel()
assert.NoError(t, k.Close())
}
}
func startTestNodeServer(t *testing.T, ctx *testcontext.Context) (*mockNodesServer, *identity.FullIdentity, string, func()) {
lis, lisCancel := newListener(t, ctx, "127.0.0.1:0")
ca, err := testidentity.NewTestCA(ctx)
require.NoError(t, err)
fullIdentity, err := ca.NewIdentity()
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
require.NoError(t, err)
grpcServer := grpc.NewServer(tlsOptions.ServerOption())
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return mn, fullIdentity, lis.Addr().String(), func() {
grpcServer.GracefulStop()
lisCancel()
}
}
func newTestServer(t *testing.T, ctx *testcontext.Context, lis net.Listener) (*mockNodesServer, func()) {
ca, err := testidentity.NewTestCA(ctx)
require.NoError(t, err)
fullIdentity, err := ca.NewIdentity()
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
require.NoError(t, err)
grpcServer := grpc.NewServer(tlsOptions.ServerOption())
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return mn, grpcServer.Stop
}

View File

@ -6,7 +6,6 @@ package kademlia
import (
"bytes"
"context"
"net"
"sync/atomic"
"testing"
"time"
@ -15,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
@ -81,12 +79,12 @@ func TestPeerDiscovery(t *testing.T) {
defer ctx.Cleanup()
// make new identity
bootServer, mockBootServer, bootID, bootAddress := startTestNodeServer(ctx)
defer bootServer.GracefulStop()
testServer, _, testID, testAddress := startTestNodeServer(ctx)
defer testServer.GracefulStop()
targetServer, _, targetID, targetAddress := startTestNodeServer(ctx)
defer targetServer.GracefulStop()
mockBootServer, bootID, bootAddress, cancel := startTestNodeServer(t, ctx)
defer cancel()
_, testID, testAddress, cancel := startTestNodeServer(t, ctx)
defer cancel()
_, targetID, targetAddress, cancel := startTestNodeServer(t, ctx)
defer cancel()
bootstrapNodes := []pb.Node{{Id: bootID.ID, Address: &pb.NodeAddress{Address: bootAddress}}}
operator := pb.NodeOperator{
@ -126,20 +124,17 @@ func TestBootstrap(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
bn, s, clean := testNode(ctx, "1", t, []pb.Node{})
bn, clean := testNode(t, ctx, "1", []pb.Node{})
defer clean()
defer s.GracefulStop()
n1, s1, clean1 := testNode(ctx, "2", t, []pb.Node{bn.routingTable.self.Node})
defer clean1()
defer s1.GracefulStop()
n1, clean := testNode(t, ctx, "2", []pb.Node{bn.routingTable.self.Node})
defer clean()
err := n1.Bootstrap(ctx)
require.NoError(t, err)
n2, s2, clean2 := testNode(ctx, "3", t, []pb.Node{bn.routingTable.self.Node})
defer clean2()
defer s2.GracefulStop()
n2, clean := testNode(t, ctx, "3", []pb.Node{bn.routingTable.self.Node})
defer clean()
err = n2.Bootstrap(ctx)
require.NoError(t, err)
@ -149,52 +144,12 @@ func TestBootstrap(t *testing.T) {
assert.Len(t, nodeIDs, 3)
}
func testNode(ctx *testcontext.Context, name string, t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
// new address
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoErrorf(t, err, "node: %s", name)
// new config
// new identity
fid, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
// new kademlia
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), pb.NodeOperator{}, fid, defaultAlpha)
require.NoError(t, err)
s := NewEndpoint(logger, k, nil, k.routingTable, nil)
// new ident opts
serverOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{
PeerIDVersions: "*",
}, nil)
require.NoError(t, err)
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
pb.RegisterNodesServer(grpcServer, s)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return k, grpcServer, func() {
assert.NoError(t, k.Close())
}
}
func TestRefresh(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
k, s, clean := testNode(ctx, "refresh", t, []pb.Node{})
k, clean := testNode(t, ctx, "refresh", []pb.Node{})
defer clean()
defer s.GracefulStop()
//turn back time for only bucket
rt := k.routingTable
now := time.Now().UTC()
@ -213,7 +168,6 @@ func TestRefresh(t *testing.T) {
ts2, err := rt.GetBucketTimestamp(ctx, bID[:])
require.NoError(t, err)
assert.True(t, ts1.Equal(ts2))
s.GracefulStop()
}
func TestFindNear(t *testing.T) {
@ -228,18 +182,10 @@ func TestFindNear(t *testing.T) {
assert.NotEqual(t, fid.ID, fid2.ID)
//start kademlia
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, _ := newTestServer(ctx)
defer srv.Stop()
ctx.Go(func() error {
err := srv.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
lis, lisCancel := newListener(t, ctx, "127.0.0.1:0")
defer lisCancel()
_, cancel := newTestServer(t, ctx, lis)
defer cancel()
bootstrap := []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrap,
@ -292,65 +238,6 @@ func TestFindNear(t *testing.T) {
}
}
func startTestNodeServer(ctx *testcontext.Context) (*grpc.Server, *mockNodesServer, *identity.FullIdentity, string) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, ""
}
ca, err := testidentity.NewTestCA(ctx)
if err != nil {
return nil, nil, nil, ""
}
fullIdentity, err := ca.NewIdentity()
if err != nil {
return nil, nil, nil, ""
}
serverOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
if err != nil {
return nil, nil, nil, ""
}
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
ctx.Go(func() error {
err := grpcServer.Serve(lis)
if err == grpc.ErrServerStopped {
err = nil
}
return err
})
return grpcServer, mn, fullIdentity, lis.Addr().String()
}
func newTestServer(ctx *testcontext.Context) (*grpc.Server, *mockNodesServer) {
ca, err := testidentity.NewTestCA(ctx)
if err != nil {
return nil, nil
}
fullIdentity, err := ca.NewIdentity()
if err != nil {
return nil, nil
}
serverOptions, err := tlsopts.NewOptions(fullIdentity, tlsopts.Config{}, nil)
if err != nil {
return nil, nil
}
identOpt := serverOptions.ServerOption()
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
return grpcServer, mn
}
// TestRandomIds makes sure finds a random node ID is within a range (start..end]
func TestRandomIds(t *testing.T) {
for x := 0; x < 1000; x++ {
@ -422,9 +309,7 @@ func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node
return nil, err
}
tlsOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{
PeerIDVersions: "*",
}, nil)
tlsOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{PeerIDVersions: "*"}, nil)
if err != nil {
return nil, err
}