diff --git a/go.mod b/go.mod index 5d760dac8..eb1e9400d 100644 --- a/go.mod +++ b/go.mod @@ -115,5 +115,5 @@ require ( gopkg.in/olivere/elastic.v5 v5.0.76 // indirect gopkg.in/spacemonkeygo/monkit.v2 v2.0.0-20190612171030-cf5a9e6f8fd2 gopkg.in/yaml.v2 v2.2.2 - storj.io/drpc v0.0.7-0.20191105232401-03e121f6d8e4 + storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2 ) diff --git a/go.sum b/go.sum index 3b5957d59..75fbf72bd 100644 --- a/go.sum +++ b/go.sum @@ -576,5 +576,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -storj.io/drpc v0.0.7-0.20191105232401-03e121f6d8e4 h1:ekHWFW5RgWp7DvUVpHSCXeuMikaZQZSBZDSUB0RfPp0= -storj.io/drpc v0.0.7-0.20191105232401-03e121f6d8e4/go.mod h1:/ascUDbzNAv0A3Jj7wUIKFBH2JdJ2uJIBO/b9+2yHgQ= +storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2 h1:8SgLYEhe99R8QlAD1EAOBPRyIR+cn2hqkXtWlAUPf/c= +storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2/go.mod h1:/ascUDbzNAv0A3Jj7wUIKFBH2JdJ2uJIBO/b9+2yHgQ= diff --git a/pkg/peertls/tlsopts/tls.go b/pkg/peertls/tlsopts/tls.go index 187823786..60ee8c796 100644 --- a/pkg/peertls/tlsopts/tls.go +++ b/pkg/peertls/tlsopts/tls.go @@ -83,9 +83,10 @@ func (opts *Options) tlsConfig(isServer bool, verificationFuncs ...peertls.PeerC } config := &tls.Config{ - Certificates: []tls.Certificate{*opts.Cert}, - InsecureSkipVerify: true, - MinVersion: tls.VersionTLS12, + Certificates: []tls.Certificate{*opts.Cert}, + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: true, // always start with big records VerifyPeerCertificate: peertls.VerifyPeerFunc( verificationFuncs..., ), diff --git a/pkg/rpc/dial.go b/pkg/rpc/dial.go index 06e6f6d2f..2c9d4122d 100644 --- a/pkg/rpc/dial.go +++ b/pkg/rpc/dial.go @@ -11,12 +11,25 @@ import ( "go.uber.org/zap" + "storj.io/drpc/drpcconn" + "storj.io/drpc/drpcmanager" + "storj.io/drpc/drpcstream" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls/tlsopts" "storj.io/storj/pkg/storj" "storj.io/storj/private/memory" ) +// NewDefaultManagerOptions returns the default options we use for drpc managers. +func NewDefaultManagerOptions() drpcmanager.Options { + return drpcmanager.Options{ + WriterBufferSize: 1024, + Stream: drpcstream.Options{ + SplitSize: (4096 * 2) - 256, + }, + } +} + // Dialer holds configuration for dialing. type Dialer struct { // TLSOptions controls the tls options for dialing. If it is nil, only @@ -37,6 +50,9 @@ type Dialer struct { // PoolCapacity is the maximum number of cached connections to hold. PoolCapacity int + + // ConnectionOptions controls the options that we pass to drpc connections. + ConnectionOptions drpcconn.Options } // NewDefaultDialer returns a Dialer with default timeouts set. @@ -45,6 +61,9 @@ func NewDefaultDialer(tlsOptions *tlsopts.Options) Dialer { TLSOptions: tlsOptions, DialTimeout: 20 * time.Second, PoolCapacity: 5, + ConnectionOptions: drpcconn.Options{ + Manager: NewDefaultManagerOptions(), + }, } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 046d51ea4..2a522e193 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,6 +18,7 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/listenmux" "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/pkg/rpc" ) // Service represents a specific gRPC method collection to be registered @@ -64,6 +65,10 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s done: make(chan struct{}), } + serverOptions := drpcserver.Options{ + Manager: rpc.NewDefaultManagerOptions(), + } + unaryInterceptor := server.logOnErrorUnaryInterceptor if interceptor != nil { unaryInterceptor = CombineInterceptors(unaryInterceptor, interceptor) @@ -75,7 +80,7 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s } server.public = public{ listener: publicListener, - drpc: drpcserver.New(), + drpc: drpcserver.NewWithOptions(serverOptions), grpc: grpc.NewServer( grpc.StreamInterceptor(server.logOnErrorStreamInterceptor), grpc.UnaryInterceptor(unaryInterceptor), @@ -89,7 +94,7 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s } server.private = private{ listener: privateListener, - drpc: drpcserver.New(), + drpc: drpcserver.NewWithOptions(serverOptions), grpc: grpc.NewServer(), }