From ea4a61f0c050ee6296fd50d7a19792f0515aad18 Mon Sep 17 00:00:00 2001 From: Natalie Villasana Date: Fri, 22 Mar 2019 13:09:37 -0400 Subject: [PATCH] adds slow kad dialer tests, adds timeout interceptor to transport (#1545) --- pkg/kademlia/dialer_test.go | 127 +++++++++++++++++++++++++++ pkg/kademlia/kademlia_planet_test.go | 54 ++++++++++++ pkg/transport/common.go | 9 +- pkg/transport/insecure.go | 2 +- pkg/transport/timeout.go | 24 +++++ pkg/transport/transport.go | 26 ++++-- 6 files changed, 231 insertions(+), 11 deletions(-) create mode 100644 pkg/transport/timeout.go diff --git a/pkg/kademlia/dialer_test.go b/pkg/kademlia/dialer_test.go index e9ca6a8b0..681ff902d 100644 --- a/pkg/kademlia/dialer_test.go +++ b/pkg/kademlia/dialer_test.go @@ -4,18 +4,24 @@ package kademlia_test import ( + "context" "fmt" "testing" + "time" + "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" + "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/internal/memory" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/transport" ) func TestDialer(t *testing.T) { @@ -152,6 +158,127 @@ func TestDialer(t *testing.T) { }) } +func TestSlowDialerHasTimeout(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // TODO: also use satellites + peers := planet.StorageNodes + + { // PingNode + self := planet.StorageNodes[0] + + tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}) + require.NoError(t, err) + + self.Transport = transport.NewClientWithTimeout(tlsOpts, 20*time.Millisecond) + + network := &transport.SimulatedNetwork{ + DialLatency: 200 * time.Second, + BytesPerSecond: 1 * memory.KB, + } + + slowClient := network.NewClient(self.Transport) + require.NotNil(t, slowClient) + + dialer := kademlia.NewDialer(zaptest.NewLogger(t), slowClient) + defer ctx.Check(dialer.Close) + + var group errgroup.Group + defer ctx.Check(group.Wait) + + for _, peer := range peers { + peer := peer + group.Go(func() error { + _, err := dialer.PingNode(ctx, peer.Local()) + require.Error(t, err, context.DeadlineExceeded) + require.True(t, transport.Error.Has(err)) + + return nil + }) + } + } + + { // FetchPeerIdentity + self := planet.StorageNodes[1] + + tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}) + require.NoError(t, err) + + self.Transport = transport.NewClientWithTimeout(tlsOpts, 20*time.Millisecond) + + network := &transport.SimulatedNetwork{ + DialLatency: 200 * time.Second, + BytesPerSecond: 1 * memory.KB, + } + + slowClient := network.NewClient(self.Transport) + require.NotNil(t, slowClient) + + dialer := kademlia.NewDialer(zaptest.NewLogger(t), slowClient) + defer ctx.Check(dialer.Close) + + var group errgroup.Group + defer ctx.Check(group.Wait) + + group.Go(func() error { + _, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local()) + require.Error(t, err, context.DeadlineExceeded) + require.True(t, transport.Error.Has(err)) + + _, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr()) + require.Error(t, err, context.DeadlineExceeded) + require.True(t, transport.Error.Has(err)) + + return nil + }) + } + + { // Lookup + self := planet.StorageNodes[2] + + tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}) + require.NoError(t, err) + + self.Transport = transport.NewClientWithTimeout(tlsOpts, 20*time.Millisecond) + + network := &transport.SimulatedNetwork{ + DialLatency: 200 * time.Second, + BytesPerSecond: 1 * memory.KB, + } + + slowClient := network.NewClient(self.Transport) + require.NotNil(t, slowClient) + + dialer := kademlia.NewDialer(zaptest.NewLogger(t), slowClient) + defer ctx.Check(dialer.Close) + + var group errgroup.Group + defer ctx.Check(group.Wait) + + for _, peer := range peers { + peer := peer + group.Go(func() error { + for _, target := range peers { + errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID()) + peer.Local().Type.DPanicOnInvalid("test client peer") + target.Local().Type.DPanicOnInvalid("test client target") + + _, err := dialer.Lookup(ctx, self.Local(), peer.Local(), target.Local()) + require.Error(t, err, context.DeadlineExceeded, errTag) + require.True(t, transport.Error.Has(err), errTag) + + return nil + } + return nil + }) + } + } + + }) +} + func containsResult(nodes []*pb.Node, target storj.NodeID) bool { for _, node := range nodes { if node.Id == target { diff --git a/pkg/kademlia/kademlia_planet_test.go b/pkg/kademlia/kademlia_planet_test.go index 4a0e3fdfc..d28ca0fc9 100644 --- a/pkg/kademlia/kademlia_planet_test.go +++ b/pkg/kademlia/kademlia_planet_test.go @@ -4,12 +4,20 @@ package kademlia_test import ( + "context" "testing" + "time" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "storj.io/storj/internal/memory" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/pkg/transport" ) func TestFetchPeerIdentity(t *testing.T) { @@ -39,3 +47,49 @@ func TestRequestInfo(t *testing.T) { require.Equal(t, node.Local().Restrictions.GetFreeBandwidth(), info.GetCapacity().GetFreeBandwidth()) }) } + +func TestPingTimeout(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + self := planet.StorageNodes[0] + routingTable := self.Kademlia.RoutingTable + + tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}) + require.NoError(t, err) + + self.Transport = transport.NewClientWithTimeout(tlsOpts, 1*time.Millisecond) + + network := &transport.SimulatedNetwork{ + DialLatency: 300 * time.Second, + BytesPerSecond: 1 * memory.KB, + } + + slowClient := network.NewClient(self.Transport) + require.NotNil(t, slowClient) + + node := pb.Node{ + Id: self.ID(), + Address: &pb.NodeAddress{ + Transport: pb.NodeTransport_TCP_TLS_GRPC, + }, + } + + newService, err := kademlia.NewService(zaptest.NewLogger(t), node, slowClient, routingTable, kademlia.Config{}) + require.NoError(t, err) + + target := pb.Node{ + Id: planet.StorageNodes[2].ID(), + Address: &pb.NodeAddress{ + Transport: pb.NodeTransport_TCP_TLS_GRPC, + Address: planet.StorageNodes[2].Addr(), + }, + } + + _, err = newService.Ping(ctx, target) + require.Error(t, err, context.DeadlineExceeded) + require.True(t, kademlia.NodeErr.Has(err) && transport.Error.Has(err)) + + }) +} diff --git a/pkg/transport/common.go b/pkg/transport/common.go index bcbae0040..4c8bcf289 100644 --- a/pkg/transport/common.go +++ b/pkg/transport/common.go @@ -14,6 +14,11 @@ var ( mon = monkit.Package() //Error is the errs class of standard Transport Client errors Error = errs.Class("transport error") - // default time to wait for a connection to be established - connWaitTimeout = 20 * time.Second +) + +const ( + // default time to wait for a connection to be established + defaultDialTimeout = 20 * time.Second + // default time to wait for a response + defaultRequestTimeout = 20 * time.Second ) diff --git a/pkg/transport/insecure.go b/pkg/transport/insecure.go index 8b589c319..156143b98 100644 --- a/pkg/transport/insecure.go +++ b/pkg/transport/insecure.go @@ -22,7 +22,7 @@ func DialAddressInsecure(ctx context.Context, address string, opts ...grpc.DialO grpc.FailOnNonTempDialError(true), }, opts...) - timedCtx, cf := context.WithTimeout(ctx, connWaitTimeout) + timedCtx, cf := context.WithTimeout(ctx, defaultDialTimeout) defer cf() conn, err = grpc.DialContext(timedCtx, address, options...) diff --git a/pkg/transport/timeout.go b/pkg/transport/timeout.go new file mode 100644 index 000000000..a93e5702b --- /dev/null +++ b/pkg/transport/timeout.go @@ -0,0 +1,24 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package transport + +import ( + "context" + "time" + + "google.golang.org/grpc" +) + +// InvokeTimeout enables timeouts for requests that take too long +type InvokeTimeout struct { + Timeout time.Duration +} + +// Intercept adds a context timeout to a method call +func (it InvokeTimeout) Intercept(ctx context.Context, method string, req interface{}, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + timedCtx, cancel := context.WithTimeout(ctx, it.Timeout) + defer cancel() + return invoker(timedCtx, method, req, reply, cc, opts...) +} diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index c1a2c5897..d02a80689 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -5,6 +5,7 @@ package transport import ( "context" + "time" "google.golang.org/grpc" @@ -30,15 +31,22 @@ type Client interface { // Transport interface structure type Transport struct { - tlsOpts *tlsopts.Options - observers []Observer + tlsOpts *tlsopts.Options + observers []Observer + requestTimeout time.Duration } -// NewClient returns a newly instantiated Transport Client +// NewClient returns a transport client with a default timeout for requests func NewClient(tlsOpts *tlsopts.Options, obs ...Observer) Client { + return NewClientWithTimeout(tlsOpts, defaultRequestTimeout, obs...) +} + +// NewClientWithTimeout returns a transport client with a specified timeout for requests +func NewClientWithTimeout(tlsOpts *tlsopts.Options, requestTimeout time.Duration, obs ...Observer) Client { return &Transport{ - tlsOpts: tlsOpts, - observers: obs, + tlsOpts: tlsOpts, + requestTimeout: requestTimeout, + observers: obs, } } @@ -65,9 +73,10 @@ func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts .. dialOption, grpc.WithBlock(), grpc.FailOnNonTempDialError(true), + grpc.WithUnaryInterceptor(InvokeTimeout{transport.requestTimeout}.Intercept), }, opts...) - timedCtx, cancel := context.WithTimeout(ctx, connWaitTimeout) + timedCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout) defer cancel() conn, err = grpc.DialContext(timedCtx, node.GetAddress().Address, options...) @@ -96,9 +105,10 @@ func (transport *Transport) DialAddress(ctx context.Context, address string, opt transport.tlsOpts.DialUnverifiedIDOption(), grpc.WithBlock(), grpc.FailOnNonTempDialError(true), + grpc.WithUnaryInterceptor(InvokeTimeout{transport.requestTimeout}.Intercept), }, opts...) - timedCtx, cancel := context.WithTimeout(ctx, connWaitTimeout) + timedCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout) defer cancel() conn, err = grpc.DialContext(timedCtx, address, options...) @@ -115,7 +125,7 @@ func (transport *Transport) Identity() *identity.FullIdentity { // WithObservers returns a new transport including the listed observers. func (transport *Transport) WithObservers(obs ...Observer) *Transport { - tr := &Transport{tlsOpts: transport.tlsOpts} + tr := &Transport{tlsOpts: transport.tlsOpts, requestTimeout: transport.requestTimeout} tr.observers = append(tr.observers, transport.observers...) tr.observers = append(tr.observers, obs...) return tr