Use testplanet in node tests (#841)

This commit is contained in:
Egon Elbre 2018-12-12 17:40:33 +02:00 committed by GitHub
parent c56307f5b9
commit 7a80e7bf2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 175 additions and 668 deletions

View File

@ -15,6 +15,7 @@ import (
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pointerdb/pdbclient"
@ -59,6 +60,8 @@ func (planet *Planet) newNode(name string, nodeType pb.NodeType) (*Node, error)
Listener: listener,
}
node.Log.Debug("id=" + identity.ID.String())
node.Transport = transport.NewClient(identity)
serverConfig := provider.ServerConfig{Address: node.Listener.Addr().String()}
@ -116,6 +119,12 @@ func (node *Node) Shutdown() error {
return utils.CombineErrors(errs...)
}
// NewNodeClient creates a node client for this node
func (n *Node) NewNodeClient() (node.Client, error) { //nolint renaming to node would conflict with package name; rename Node to Peer to resolve
// TODO: handle disconnect verification
return node.NewNodeClient(n.Identity, n.Info, n.Kademlia)
}
// DialPointerDB dials destination with apikey and returns pointerdb Client
func (node *Node) DialPointerDB(destination *Node, apikey string) (pdbclient.Client, error) {
// TODO: use node.Transport instead of pdbclient.NewClient

View File

@ -90,7 +90,7 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
}
for _, n := range planet.nodes {
server := node.NewServer(n.Kademlia)
server := node.NewServer(n.Log.Named("node"), n.Kademlia)
pb.RegisterNodesServer(n.Provider.GRPC(), server)
// TODO: shutdown
}
@ -190,6 +190,9 @@ func (planet *Planet) Start(ctx context.Context) {
planet.started = true
}
// Size returns number of nodes in the network
func (planet *Planet) Size() int { return len(planet.nodes) }
// Shutdown shuts down all the nodes and deletes temporary directories.
func (planet *Planet) Shutdown() error {
var errs []error

View File

@ -1,294 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: storj.io/storj/pkg/dht (interfaces: DHT,RoutingTable)
// Package mock_dht is a generated GoMock package.
package mock_dht
import (
context "context"
reflect "reflect"
time "time"
gomock "github.com/golang/mock/gomock"
dht "storj.io/storj/pkg/dht"
pb "storj.io/storj/pkg/pb"
storj "storj.io/storj/pkg/storj"
storage "storj.io/storj/storage"
)
// MockDHT is a mock of DHT interface
type MockDHT struct {
ctrl *gomock.Controller
recorder *MockDHTMockRecorder
}
// MockDHTMockRecorder is the mock recorder for MockDHT
type MockDHTMockRecorder struct {
mock *MockDHT
}
// NewMockDHT creates a new mock instance
func NewMockDHT(ctrl *gomock.Controller) *MockDHT {
mock := &MockDHT{ctrl: ctrl}
mock.recorder = &MockDHTMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockDHT) EXPECT() *MockDHTMockRecorder {
return m.recorder
}
// Bootstrap mocks base method
func (m *MockDHT) Bootstrap(arg0 context.Context) error {
ret := m.ctrl.Call(m, "Bootstrap", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Bootstrap indicates an expected call of Bootstrap
func (mr *MockDHTMockRecorder) Bootstrap(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockDHT)(nil).Bootstrap), arg0)
}
// Disconnect mocks base method
func (m *MockDHT) Disconnect() error {
ret := m.ctrl.Call(m, "Disconnect")
ret0, _ := ret[0].(error)
return ret0
}
// Disconnect indicates an expected call of Disconnect
func (mr *MockDHTMockRecorder) Disconnect() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockDHT)(nil).Disconnect))
}
// FindNode mocks base method
func (m *MockDHT) FindNode(arg0 context.Context, arg1 storj.NodeID) (pb.Node, error) {
ret := m.ctrl.Call(m, "FindNode", arg0, arg1)
ret0, _ := ret[0].(pb.Node)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FindNode indicates an expected call of FindNode
func (mr *MockDHTMockRecorder) FindNode(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindNode", reflect.TypeOf((*MockDHT)(nil).FindNode), arg0, arg1)
}
// GetNodes mocks base method
func (m *MockDHT) GetNodes(arg0 context.Context, arg1 storj.NodeID, arg2 int, arg3 ...pb.Restriction) ([]*pb.Node, error) {
varargs := []interface{}{arg0, arg1, arg2}
for _, a := range arg3 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "GetNodes", varargs...)
ret0, _ := ret[0].([]*pb.Node)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetNodes indicates an expected call of GetNodes
func (mr *MockDHTMockRecorder) GetNodes(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1, arg2}, arg3...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNodes", reflect.TypeOf((*MockDHT)(nil).GetNodes), varargs...)
}
// GetRoutingTable mocks base method
func (m *MockDHT) GetRoutingTable(arg0 context.Context) (dht.RoutingTable, error) {
ret := m.ctrl.Call(m, "GetRoutingTable", arg0)
ret0, _ := ret[0].(dht.RoutingTable)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetRoutingTable indicates an expected call of GetRoutingTable
func (mr *MockDHTMockRecorder) GetRoutingTable(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRoutingTable", reflect.TypeOf((*MockDHT)(nil).GetRoutingTable), arg0)
}
// Ping mocks base method
func (m *MockDHT) Ping(arg0 context.Context, arg1 pb.Node) (pb.Node, error) {
ret := m.ctrl.Call(m, "Ping", arg0, arg1)
ret0, _ := ret[0].(pb.Node)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Ping indicates an expected call of Ping
func (mr *MockDHTMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockDHT)(nil).Ping), arg0, arg1)
}
// Seen mocks base method
func (m *MockDHT) Seen() []*pb.Node {
ret := m.ctrl.Call(m, "Seen")
ret0, _ := ret[0].([]*pb.Node)
return ret0
}
// Seen indicates an expected call of Seen
func (mr *MockDHTMockRecorder) Seen() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Seen", reflect.TypeOf((*MockDHT)(nil).Seen))
}
// MockRoutingTable is a mock of RoutingTable interface
type MockRoutingTable struct {
ctrl *gomock.Controller
recorder *MockRoutingTableMockRecorder
}
// MockRoutingTableMockRecorder is the mock recorder for MockRoutingTable
type MockRoutingTableMockRecorder struct {
mock *MockRoutingTable
}
// NewMockRoutingTable creates a new mock instance
func NewMockRoutingTable(ctrl *gomock.Controller) *MockRoutingTable {
mock := &MockRoutingTable{ctrl: ctrl}
mock.recorder = &MockRoutingTableMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockRoutingTable) EXPECT() *MockRoutingTableMockRecorder {
return m.recorder
}
// CacheSize mocks base method
func (m *MockRoutingTable) CacheSize() int {
ret := m.ctrl.Call(m, "CacheSize")
ret0, _ := ret[0].(int)
return ret0
}
// CacheSize indicates an expected call of CacheSize
func (mr *MockRoutingTableMockRecorder) CacheSize() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheSize", reflect.TypeOf((*MockRoutingTable)(nil).CacheSize))
}
// ConnectionFailed mocks base method
func (m *MockRoutingTable) ConnectionFailed(arg0 *pb.Node) error {
ret := m.ctrl.Call(m, "ConnectionFailed", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// ConnectionFailed indicates an expected call of ConnectionFailed
func (mr *MockRoutingTableMockRecorder) ConnectionFailed(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectionFailed", reflect.TypeOf((*MockRoutingTable)(nil).ConnectionFailed), arg0)
}
// ConnectionSuccess mocks base method
func (m *MockRoutingTable) ConnectionSuccess(arg0 *pb.Node) error {
ret := m.ctrl.Call(m, "ConnectionSuccess", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// ConnectionSuccess indicates an expected call of ConnectionSuccess
func (mr *MockRoutingTableMockRecorder) ConnectionSuccess(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectionSuccess", reflect.TypeOf((*MockRoutingTable)(nil).ConnectionSuccess), arg0)
}
// FindNear mocks base method
func (m *MockRoutingTable) FindNear(arg0 storj.NodeID, arg1 int) ([]*pb.Node, error) {
ret := m.ctrl.Call(m, "FindNear", arg0, arg1)
ret0, _ := ret[0].([]*pb.Node)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FindNear indicates an expected call of FindNear
func (mr *MockRoutingTableMockRecorder) FindNear(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindNear", reflect.TypeOf((*MockRoutingTable)(nil).FindNear), arg0, arg1)
}
// GetBucket mocks base method
func (m *MockRoutingTable) GetBucket(arg0 storj.NodeID) (dht.Bucket, bool) {
ret := m.ctrl.Call(m, "GetBucket", arg0)
ret0, _ := ret[0].(dht.Bucket)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetBucket indicates an expected call of GetBucket
func (mr *MockRoutingTableMockRecorder) GetBucket(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBucket", reflect.TypeOf((*MockRoutingTable)(nil).GetBucket), arg0)
}
// GetBucketIds mocks base method
func (m *MockRoutingTable) GetBucketIds() (storage.Keys, error) {
ret := m.ctrl.Call(m, "GetBucketIds")
ret0, _ := ret[0].(storage.Keys)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetBucketIds indicates an expected call of GetBucketIds
func (mr *MockRoutingTableMockRecorder) GetBucketIds() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBucketIds", reflect.TypeOf((*MockRoutingTable)(nil).GetBucketIds))
}
// GetBucketTimestamp mocks base method
func (m *MockRoutingTable) GetBucketTimestamp(arg0 []byte, arg1 dht.Bucket) (time.Time, error) {
ret := m.ctrl.Call(m, "GetBucketTimestamp", arg0, arg1)
ret0, _ := ret[0].(time.Time)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetBucketTimestamp indicates an expected call of GetBucketTimestamp
func (mr *MockRoutingTableMockRecorder) GetBucketTimestamp(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBucketTimestamp", reflect.TypeOf((*MockRoutingTable)(nil).GetBucketTimestamp), arg0, arg1)
}
// GetBuckets mocks base method
func (m *MockRoutingTable) GetBuckets() ([]dht.Bucket, error) {
ret := m.ctrl.Call(m, "GetBuckets")
ret0, _ := ret[0].([]dht.Bucket)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetBuckets indicates an expected call of GetBuckets
func (mr *MockRoutingTableMockRecorder) GetBuckets() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBuckets", reflect.TypeOf((*MockRoutingTable)(nil).GetBuckets))
}
// K mocks base method
func (m *MockRoutingTable) K() int {
ret := m.ctrl.Call(m, "K")
ret0, _ := ret[0].(int)
return ret0
}
// K indicates an expected call of K
func (mr *MockRoutingTableMockRecorder) K() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "K", reflect.TypeOf((*MockRoutingTable)(nil).K))
}
// Local mocks base method
func (m *MockRoutingTable) Local() pb.Node {
ret := m.ctrl.Call(m, "Local")
ret0, _ := ret[0].(pb.Node)
return ret0
}
// Local indicates an expected call of Local
func (mr *MockRoutingTableMockRecorder) Local() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Local", reflect.TypeOf((*MockRoutingTable)(nil).Local))
}
// SetBucketTimestamp mocks base method
func (m *MockRoutingTable) SetBucketTimestamp(arg0 []byte, arg1 time.Time) error {
ret := m.ctrl.Call(m, "SetBucketTimestamp", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SetBucketTimestamp indicates an expected call of SetBucketTimestamp
func (mr *MockRoutingTableMockRecorder) SetBucketTimestamp(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetBucketTimestamp", reflect.TypeOf((*MockRoutingTable)(nil).SetBucketTimestamp), arg0, arg1)
}

View File

@ -81,7 +81,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
}
defer func() { err = utils.CombineErrors(err, kad.Disconnect()) }()
pb.RegisterNodesServer(server.GRPC(), node.NewServer(kad))
pb.RegisterNodesServer(server.GRPC(), node.NewServer(zap.L(), kad))
go func() {
if err = kad.Bootstrap(ctx); err != nil {

View File

@ -259,7 +259,7 @@ func (k *Kademlia) ListenAndServe() error {
}
grpcServer := grpc.NewServer(identOpt)
mn := node.NewServer(k)
mn := node.NewServer(zap.L(), k)
pb.RegisterNodesServer(grpcServer, mn)
lis, err := net.Listen("tcp", k.address)

View File

@ -16,6 +16,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/grpc"
testidentity "storj.io/storj/internal/identity"
@ -170,7 +171,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
k, err := NewKademlia(fid.ID, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
assert.NoError(t, err)
s := node.NewServer(k)
s := node.NewServer(zap.L(), k)
// new ident opts
identOpt, err := fid.ServerOption()
assert.NoError(t, err)

View File

@ -1,119 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package node
import (
"context"
"sync"
"testing"
"unsafe"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
var fooID = teststorj.NodeIDFromString("foo")
func TestGet(t *testing.T) {
cases := []struct {
pool *ConnectionPool
nodeID storj.NodeID
expected Conn
expectedError error
}{
{
pool: func() *ConnectionPool {
p := NewConnectionPool(newTestIdentity(t))
p.Init()
p.items[fooID] = &Conn{addr: "foo"}
return p
}(),
nodeID: fooID,
expected: Conn{addr: "foo"},
expectedError: nil,
},
}
for i := range cases {
v := &cases[i]
test, err := v.pool.Get(v.nodeID)
assert.Equal(t, v.expectedError, err)
assert.Equal(t, v.expected.addr, test.(*Conn).addr)
}
}
func TestDisconnect(t *testing.T) {
conn, err := grpc.Dial("127.0.0.1:0", grpc.WithInsecure())
assert.NoError(t, err)
// gc.Close = func() error { return nil }
cases := []struct {
pool ConnectionPool
nodeID storj.NodeID
expected interface{}
expectedError error
}{
{
pool: ConnectionPool{
mu: sync.RWMutex{},
items: map[storj.NodeID]*Conn{fooID: &Conn{grpc: unsafe.Pointer(conn)}},
},
nodeID: fooID,
expected: nil,
expectedError: nil,
},
}
for i := range cases {
v := &cases[i]
err := v.pool.Disconnect(v.nodeID)
assert.Equal(t, v.expectedError, err)
test, err := v.pool.Get(v.nodeID)
assert.Equal(t, v.expectedError, err)
assert.Equal(t, v.expected, test)
}
}
func TestDial(t *testing.T) {
t.Skip()
cases := []struct {
pool *ConnectionPool
node *pb.Node
expectedError error
expected *Conn
}{
{
pool: NewConnectionPool(newTestIdentity(t)),
node: &pb.Node{Id: fooID, Address: &pb.NodeAddress{Address: "127.0.0.1:0"}},
expected: nil,
expectedError: nil,
},
}
for _, v := range cases {
wg := sync.WaitGroup{}
wg.Add(4)
go testDial(t, &wg, v.pool, v.node, v.expectedError)
go testDial(t, &wg, v.pool, v.node, v.expectedError)
go testDial(t, &wg, v.pool, v.node, v.expectedError)
go testDial(t, &wg, v.pool, v.node, v.expectedError)
wg.Wait()
}
}
func testDial(t *testing.T, wg *sync.WaitGroup, p *ConnectionPool, n *pb.Node, eerr error) {
defer wg.Done()
ctx := context.Background()
actual, err := p.Dial(ctx, n)
assert.Equal(t, eerr, err)
assert.NotNil(t, actual)
}

View File

@ -18,18 +18,24 @@ type Node struct {
}
// Lookup queries nodes looking for a particular node in the network
func (n *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) {
c, err := n.pool.Dial(ctx, &to)
func (node *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node, error) {
conn, err := node.pool.Dial(ctx, &to)
if err != nil {
return nil, NodeClientErr.Wrap(err)
}
resp, err := c.Query(ctx, &pb.QueryRequest{Limit: 20, Sender: &n.self, Target: &find, Pingback: true})
resp, err := conn.Query(ctx, &pb.QueryRequest{
Limit: 20,
Sender: &node.self,
Target: &find,
Pingback: true,
})
if err != nil {
return nil, NodeClientErr.Wrap(err)
}
rt, err := n.dht.GetRoutingTable(ctx)
rt, err := node.dht.GetRoutingTable(ctx)
if err != nil {
return nil, NodeClientErr.Wrap(err)
}
@ -42,13 +48,13 @@ func (n *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node
}
// Ping attempts to establish a connection with a node to verify it is alive
func (n *Node) Ping(ctx context.Context, to pb.Node) (bool, error) {
c, err := n.pool.Dial(ctx, &to)
func (node *Node) Ping(ctx context.Context, to pb.Node) (bool, error) {
conn, err := node.pool.Dial(ctx, &to)
if err != nil {
return false, NodeClientErr.Wrap(err)
}
_, err = c.Ping(ctx, &pb.PingRequest{})
_, err = conn.Ping(ctx, &pb.PingRequest{})
if err != nil {
return false, err
}
@ -57,6 +63,6 @@ func (n *Node) Ping(ctx context.Context, to pb.Node) (bool, error) {
}
// Disconnect closes all connections within the pool
func (n *Node) Disconnect() error {
return n.pool.DisconnectAll()
func (node *Node) Disconnect() error {
return node.pool.DisconnectAll()
}

View File

@ -1,158 +1,142 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package node
package node_test
import (
"context"
"net"
"fmt"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/identity"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/dht/mocks"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
)
var (
ctx = context.Background()
helloID = teststorj.NodeIDFromString("hello")
)
func TestLookup(t *testing.T) {
func TestClient(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
cases := []struct {
self pb.Node
to pb.Node
find pb.Node
expectedErr error
}{
{
self: pb.Node{Id: helloID, Address: &pb.NodeAddress{Address: ":7070"}},
to: pb.Node{Id: helloID, Address: &pb.NodeAddress{Address: ":8080"}},
find: pb.Node{Id: helloID, Address: &pb.NodeAddress{Address: ":9090"}},
expectedErr: nil,
},
}
for _, v := range cases {
lis, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
id := newTestIdentity(t)
v.to = pb.Node{Id: id.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}
srv, mock, err := newTestServer(ctx, &mockNodeServer{queryCalled: 0}, id)
assert.NoError(t, err)
ctx.Go(func() error { return srv.Serve(lis) })
defer srv.Stop()
ctrl := gomock.NewController(t)
mdht := mock_dht.NewMockDHT(ctrl)
mrt := mock_dht.NewMockRoutingTable(ctrl)
mdht.EXPECT().GetRoutingTable(gomock.Any()).Return(mrt, nil)
mrt.EXPECT().ConnectionSuccess(gomock.Any()).Return(nil)
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
nc, err := NewNodeClient(identity, v.self, mdht)
assert.NoError(t, err)
_, err = nc.Lookup(ctx, v.to, v.find)
assert.Equal(t, v.expectedErr, err)
assert.Equal(t, 1, mock.(*mockNodeServer).queryCalled)
}
}
func TestPing(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
cases := []struct {
self pb.Node
toID string
ident *provider.FullIdentity
expectedErr error
}{
{
self: pb.Node{Id: helloID, Address: &pb.NodeAddress{Address: ":7070"}},
toID: "",
ident: newTestIdentity(t),
expectedErr: nil,
},
}
for _, v := range cases {
lis, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
// new mock DHT for node client
ctrl := gomock.NewController(t)
mdht := mock_dht.NewMockDHT(ctrl)
// set up a node server
srv := NewServer(mdht)
msrv, _, err := newTestServer(ctx, srv, v.ident)
assert.NoError(t, err)
// start gRPC server
ctx.Go(func() error { return msrv.Serve(lis) })
defer msrv.Stop()
nc, err := NewNodeClient(v.ident, v.self, mdht)
assert.NoError(t, err)
ok, err := nc.Ping(ctx, pb.Node{Id: v.ident.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}})
assert.Equal(t, v.expectedErr, err)
assert.Equal(t, ok, true)
}
}
func newTestServer(ctx context.Context, ns pb.NodesServer, identity *provider.FullIdentity) (*grpc.Server, pb.NodesServer, error) {
identOpt, err := identity.ServerOption()
planet, err := testplanet.New(t, 1, 4, 0)
if err != nil {
return nil, nil, err
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
time.Sleep(2 * time.Second)
peers := []*testplanet.Node{}
peers = append(peers, planet.Satellites...)
peers = append(peers, planet.StorageNodes...)
{ // Ping
client, err := planet.StorageNodes[0].NewNodeClient()
assert.NoError(t, err)
defer ctx.Check(client.Disconnect)
var group errgroup.Group
for i := range peers {
peer := peers[i]
group.Go(func() error {
pinged, err := client.Ping(ctx, peer.Info)
var pingErr error
if !pinged {
pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID())
}
return utils.CombineErrors(pingErr, err)
})
}
defer ctx.Check(group.Wait)
}
grpcServer := grpc.NewServer(identOpt)
pb.RegisterNodesServer(grpcServer, ns)
{ // Lookup
client, err := planet.StorageNodes[1].NewNodeClient()
assert.NoError(t, err)
defer ctx.Check(client.Disconnect)
return grpcServer, ns, nil
var group errgroup.Group
for i := range peers {
peer := peers[i]
group.Go(func() error {
for _, target := range peers {
errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID())
results, err := client.Lookup(ctx, peer.Info, target.Info)
if err != nil {
return utils.CombineErrors(errTag, err)
}
if containsResult(results, target.ID()) {
continue
}
// with small network we expect to return everything
if len(results) != planet.Size() {
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
}
return nil
}
return nil
})
}
defer ctx.Check(group.Wait)
}
{ // Lookup
client, err := planet.StorageNodes[2].NewNodeClient()
assert.NoError(t, err)
defer ctx.Check(client.Disconnect)
targets := []storj.NodeID{
{}, // empty target
{255}, // non-empty
}
var group errgroup.Group
for i := range targets {
target := targets[i]
for i := range peers {
peer := peers[i]
group.Go(func() error {
errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target)
results, err := client.Lookup(ctx, peer.Info, pb.Node{Id: target})
if err != nil {
return utils.CombineErrors(errTag, err)
}
// with small network we expect to return everything
if len(results) != planet.Size() {
return utils.CombineErrors(errTag, fmt.Errorf("expected %d got %d: %s", planet.Size(), len(results), pb.NodesToIDs(results)))
}
return nil
})
}
}
defer ctx.Check(group.Wait)
}
}
type mockNodeServer struct {
queryCalled int
pingCalled int
}
func (mn *mockNodeServer) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
mn.queryCalled++
return &pb.QueryResponse{}, nil
}
func (mn *mockNodeServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
mn.pingCalled++
return &pb.PingResponse{}, nil
}
func newTestIdentity(t *testing.T) *provider.FullIdentity {
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
return identity
func containsResult(nodes []*pb.Node, target storj.NodeID) bool {
for _, node := range nodes {
if node.Id == target {
return true
}
}
return false
}

View File

@ -14,55 +14,51 @@ import (
// Server implements the grpc Node Server
type Server struct {
dht dht.DHT
logger *zap.Logger
dht dht.DHT
log *zap.Logger
}
// NewServer returns a newly instantiated Node Server
func NewServer(dht dht.DHT) *Server {
func NewServer(log *zap.Logger, dht dht.DHT) *Server {
return &Server{
dht: dht,
logger: zap.L(),
dht: dht,
log: log,
}
}
// Query is a node to node communication query
func (s *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
if s.logger == nil {
s.logger = zap.L()
}
rt, err := s.dht.GetRoutingTable(ctx)
func (server *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
rt, err := server.dht.GetRoutingTable(ctx)
if err != nil {
return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %s", err)
return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %server", err)
}
if req.GetPingback() {
_, err = s.dht.Ping(ctx, *req.Sender)
_, err = server.dht.Ping(ctx, *req.Sender)
if err != nil {
err = rt.ConnectionFailed(req.Sender)
if err != nil {
s.logger.Error("could not respond to connection failed", zap.Error(err))
server.log.Error("could not respond to connection failed", zap.Error(err))
}
s.logger.Error("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String()))
server.log.Error("connection to node failed", zap.Error(err), zap.String("nodeID", req.Sender.Id.String()))
}
err = rt.ConnectionSuccess(req.Sender)
if err != nil {
s.logger.Error("could not respond to connection success", zap.Error(err))
server.log.Error("could not respond to connection success", zap.Error(err))
}
}
nodes, err := rt.FindNear(req.Target.Id, int(req.Limit))
if err != nil {
return &pb.QueryResponse{}, NodeClientErr.New("could not find near %s", err)
return &pb.QueryResponse{}, NodeClientErr.New("could not find near %server", err)
}
return &pb.QueryResponse{Sender: req.Sender, Response: nodes}, nil
}
// Ping provides an easy way to verify a node is online and accepting requests
func (s *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
func (server *Server) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
//TODO
return &pb.PingResponse{}, nil
}

View File

@ -1,90 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package node
import (
"context"
"fmt"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/dht/mocks"
"storj.io/storj/pkg/pb"
)
func TestQuery(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDHT := mock_dht.NewMockDHT(ctrl)
mockRT := mock_dht.NewMockRoutingTable(ctrl)
s := &Server{dht: mockDHT}
sender := &pb.Node{Id: teststorj.NodeIDFromString("A")}
target := &pb.Node{Id: teststorj.NodeIDFromString("B")}
node := &pb.Node{Id: teststorj.NodeIDFromString("C")}
cases := []struct {
caseName string
rt dht.RoutingTable
getRTErr error
pingNode pb.Node
pingErr error
successErr error
failErr error
findNear []*pb.Node
limit int
nearErr error
res *pb.QueryResponse
err error
}{
{caseName: "ping success, return sender",
rt: mockRT,
getRTErr: nil,
pingNode: *sender,
pingErr: nil,
successErr: nil,
failErr: nil,
findNear: []*pb.Node{target},
limit: 2,
nearErr: nil,
res: &pb.QueryResponse{Sender: sender, Response: []*pb.Node{target}},
err: nil,
},
{caseName: "ping success, return nearest",
rt: mockRT,
getRTErr: nil,
pingNode: *sender,
pingErr: nil,
successErr: nil,
failErr: nil,
findNear: []*pb.Node{sender, node},
limit: 2,
nearErr: nil,
res: &pb.QueryResponse{Sender: sender, Response: []*pb.Node{sender, node}},
err: nil,
},
}
for i, v := range cases {
req := pb.QueryRequest{Pingback: true, Sender: sender, Target: &pb.Node{Id: teststorj.NodeIDFromString("B")}, Limit: int64(2)}
mockDHT.EXPECT().GetRoutingTable(gomock.Any()).Return(v.rt, v.getRTErr)
mockDHT.EXPECT().Ping(gomock.Any(), gomock.Any()).Return(v.pingNode, v.pingErr)
if v.pingErr != nil {
mockRT.EXPECT().ConnectionFailed(gomock.Any()).Return(v.failErr)
} else {
mockRT.EXPECT().ConnectionSuccess(gomock.Any()).Return(v.successErr)
if v.successErr == nil {
mockRT.EXPECT().FindNear(gomock.Any(), v.limit).Return(v.findNear, v.nearErr)
}
}
res, err := s.Query(context.Background(), &req)
if !assert.Equal(t, v.res, res) {
fmt.Printf("case %s (%v) failed\n", v.caseName, i)
}
if v.err == nil && !assert.NoError(t, err) {
fmt.Printf("query errored at case %v\n", i)
}
}
}

View File

@ -25,6 +25,17 @@ func LookupResponsesToNodes(responses *LookupResponses) []*Node {
return nodes
}
// NodesToIDs extracts Node-s into a list of ids
func NodesToIDs(nodes []*Node) storj.NodeIDList {
ids := make(storj.NodeIDList, len(nodes))
for i, node := range nodes {
if node != nil {
ids[i] = node.Id
}
}
return ids
}
// CopyNode returns a deep copy of a node
// It would be better to use `proto.Clone` but it is curently incompatible
// with gogo's customtype extension.