pkg/server,pkg/quic: accept an existing conn to create quic listener and

allow disabling tcp/quic

In order to have more control of a server so that we can
simulate connection failures in `testplanet`, this PR changes
quic.Listener to accept an existing UDPConn instead of relying on the
quic-go library to create the UDPConn.
This PR also adds two flags on the `server.Config` struct to allow
enabling/disabling tcp/tls listener and quic listener. By default, they
are both set to true.
    - `DisableTCPTLS`: internal flag, disables tcp/tls listener.
    - `DisableQUIC`: hidden flag, disables quic listener
By making the `DisableQUIC` a hidden flag, it allows storagenode operators to
have the ability to disable quic traffic in case their set up can't work
with udp traffic.

Change-Id: I853b12435d988b9c41ad9b873fd57480d792e378
This commit is contained in:
Yingrong Zhao 2021-01-28 18:24:35 -05:00
parent a754c3984b
commit 7e80badaf9
8 changed files with 155 additions and 70 deletions

View File

@ -76,7 +76,7 @@ func New(log *zap.Logger, ident *identity.FullIdentity, ca *identity.FullCertifi
return nil, Error.Wrap(errs.Combine(err, peer.Close()))
}
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc)
if err != nil {
return nil, Error.Wrap(err)
}

View File

@ -9,6 +9,7 @@ import (
"net"
"github.com/lucas-clemente/quic-go"
"github.com/zeebo/errs"
"storj.io/common/peertls/tlsopts"
)
@ -16,25 +17,30 @@ import (
// Listener implements listener for QUIC.
type Listener struct {
listener quic.Listener
conn *net.UDPConn
}
// NewListener returns a new listener instance for QUIC.
// The quic.Config may be nil, in that case the default values will be used.
// if the provided context is closed, all existing or following Accept calls will return an error.
func NewListener(tlsConfig *tls.Config, address string, quicConfig *quic.Config) (net.Listener, error) {
func NewListener(conn *net.UDPConn, tlsConfig *tls.Config, quicConfig *quic.Config) (net.Listener, error) {
if conn == nil {
return nil, Error.New("underlying udp connection can't be nil")
}
if tlsConfig == nil {
return nil, Error.New("tls config is not set")
}
tlsConfigCopy := tlsConfig.Clone()
tlsConfigCopy.NextProtos = []string{tlsopts.StorjApplicationProtocol}
listener, err := quic.ListenAddr(address, tlsConfigCopy, quicConfig)
listener, err := quic.Listen(conn, tlsConfigCopy, quicConfig)
if err != nil {
return nil, err
}
return &Listener{
listener: listener,
conn: conn,
}, nil
}
@ -52,8 +58,8 @@ func (l *Listener) Accept() (net.Conn, error) {
}
// Close closes the QUIC listener.
func (l *Listener) Close() error {
return l.listener.Close()
func (l *Listener) Close() (err error) {
return errs.Combine(l.listener.Close(), l.conn.Close())
}
// Addr returns the local network addr that the server is listening on.

View File

@ -34,15 +34,22 @@ type Config struct {
tlsopts.Config
Address string `user:"true" help:"public address to listen on" default:":7777"`
PrivateAddress string `user:"true" help:"private address to listen on" default:"127.0.0.1:7778"`
DisableQUIC bool `help:"disable QUIC listener on a server" hidden:"true" default:"false"`
DisableTCPTLS bool `help:"disable TCP/TLS listener on a server" internal:"true"`
DebugLogTraffic bool `hidden:"true" default:"false"` // Deprecated
}
type public struct {
tcpListener net.Listener
quicListener net.Listener
drpc *drpcserver.Server
mux *drpcmux.Mux
tcpListener net.Listener
udpConn *net.UDPConn
quicListener net.Listener
addr net.Addr
disableTCPTLS bool
disableQUIC bool
drpc *drpcserver.Server
mux *drpcmux.Mux
}
type private struct {
@ -67,55 +74,24 @@ type Server struct {
// New creates a Server out of an Identity, a net.Listener,
// and interceptors.
func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr string) (*Server, error) {
func New(log *zap.Logger, tlsOptions *tlsopts.Options, config Config) (_ *Server, err error) {
server := &Server{
log: log,
tlsOptions: tlsOptions,
done: make(chan struct{}),
}
server.public, err = newPublic(config.Address, config.DisableTCPTLS, config.DisableQUIC)
if err != nil {
return nil, Error.Wrap(err)
}
serverOptions := drpcserver.Options{
Manager: rpc.NewDefaultManagerOptions(),
}
var err error
var publicTCPListener, publicQUICListener net.Listener
for retry := 0; ; retry++ {
publicTCPListener, err = net.Listen("tcp", publicAddr)
if err != nil {
return nil, err
}
publicQUICListener, err = quic.NewListener(tlsOptions.ServerTLSConfig(), publicTCPListener.Addr().String(), &quicgo.Config{MaxIdleTimeout: defaultUserTimeout})
if err != nil {
_, port, _ := net.SplitHostPort(publicAddr)
if port == "0" && retry < 10 && isErrorAddressAlreadyInUse(err) {
// from here, we know for sure that the tcp port chosen by the
// os is available, but we don't know if the same port number
// for udp is also available.
// if a udp port is already in use, we will close the tcp port and retry
// to find one that is available for both udp and tcp.
_ = publicTCPListener.Close()
continue
}
return nil, errs.Combine(err, publicTCPListener.Close())
}
break
}
publicMux := drpcmux.New()
publicTracingHandler := rpctracing.NewHandler(publicMux, jaeger.RemoteTraceHandler)
server.public = public{
tcpListener: wrapListener(publicTCPListener),
quicListener: wrapListener(publicQUICListener),
drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
mux: publicMux,
}
privateListener, err := net.Listen("tcp", privateAddr)
privateListener, err := net.Listen("tcp", config.PrivateAddress)
if err != nil {
return nil, errs.Combine(err, publicTCPListener.Close(), publicQUICListener.Close())
return nil, errs.Combine(err, server.public.Close())
}
privateMux := drpcmux.New()
privateTracingHandler := rpctracing.NewHandler(privateMux, jaeger.RemoteTraceHandler)
@ -132,7 +108,7 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
func (p *Server) Identity() *identity.FullIdentity { return p.tlsOptions.Ident }
// Addr returns the server's public listener address.
func (p *Server) Addr() net.Addr { return p.public.tcpListener.Addr() }
func (p *Server) Addr() net.Addr { return p.public.addr }
// PrivateAddr returns the server's private listener address.
func (p *Server) PrivateAddr() net.Addr { return p.private.listener.Addr() }
@ -156,8 +132,7 @@ func (p *Server) Close() error {
// We ignore these errors because there's not really anything to do
// even if they happen, and they'll just be errors due to duplicate
// closes anyway.
_ = p.public.quicListener.Close()
_ = p.public.tcpListener.Close()
_ = p.public.Close()
_ = p.private.listener.Close()
return nil
}
@ -186,8 +161,21 @@ func (p *Server) Run(ctx context.Context) (err error) {
// a chance to be notified that they're done running.
const drpcHeader = "DRPC!!!1"
publicMux := listenmux.New(p.public.tcpListener, len(drpcHeader))
publicDRPCListener := tls.NewListener(publicMux.Route(drpcHeader), p.tlsOptions.ServerTLSConfig())
var (
publicMux *listenmux.Mux
publicDRPCListener net.Listener
)
if p.public.tcpListener != nil {
publicMux = listenmux.New(p.public.tcpListener, len(drpcHeader))
publicDRPCListener = tls.NewListener(publicMux.Route(drpcHeader), p.tlsOptions.ServerTLSConfig())
}
if p.public.udpConn != nil {
p.public.quicListener, err = quic.NewListener(p.public.udpConn, p.tlsOptions.ServerTLSConfig(), &quicgo.Config{MaxIdleTimeout: defaultUserTimeout})
if err != nil {
return err
}
}
privateMux := listenmux.New(p.private.listener, len(drpcHeader))
privateDRPCListener := privateMux.Route(drpcHeader)
@ -201,9 +189,11 @@ func (p *Server) Run(ctx context.Context) (err error) {
defer muxCancel()
var muxGroup errgroup.Group
muxGroup.Go(func() error {
return publicMux.Run(muxCtx)
})
if publicMux != nil {
muxGroup.Go(func() error {
return publicMux.Run(muxCtx)
})
}
muxGroup.Go(func() error {
return privateMux.Run(muxCtx)
})
@ -223,14 +213,20 @@ func (p *Server) Run(ctx context.Context) (err error) {
return nil
})
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, publicDRPCListener)
})
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, p.public.quicListener)
})
if publicDRPCListener != nil {
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, publicDRPCListener)
})
}
if p.public.quicListener != nil {
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, wrapListener(p.public.quicListener))
})
}
group.Go(func() error {
defer cancel()
return p.private.drpc.Serve(ctx, privateDRPCListener)
@ -244,6 +240,89 @@ func (p *Server) Run(ctx context.Context) (err error) {
return errs.Combine(err, muxGroup.Wait())
}
func newPublic(publicAddr string, disableTCPTLS, disableQUIC bool) (public, error) {
var (
err error
publicTCPListener net.Listener
publicUDPConn *net.UDPConn
)
for retry := 0; ; retry++ {
addr := publicAddr
if !disableTCPTLS {
publicTCPListener, err = net.Listen("tcp", addr)
if err != nil {
return public{}, err
}
addr = publicTCPListener.Addr().String()
}
if !disableQUIC {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return public{}, err
}
publicUDPConn, err = net.ListenUDP("udp", udpAddr)
if err != nil {
_, port, _ := net.SplitHostPort(publicAddr)
if port == "0" && retry < 10 && isErrorAddressAlreadyInUse(err) {
// from here, we know for sure that the tcp port chosen by the
// os is available, but we don't know if the same port number
// for udp is also available.
// if a udp port is already in use, we will close the tcp port and retry
// to find one that is available for both udp and tcp.
_ = publicTCPListener.Close()
continue
}
return public{}, errs.Combine(err, publicTCPListener.Close())
}
}
break
}
publicMux := drpcmux.New()
publicTracingHandler := rpctracing.NewHandler(publicMux, jaeger.RemoteTraceHandler)
serverOptions := drpcserver.Options{
Manager: rpc.NewDefaultManagerOptions(),
}
var netAddr net.Addr
if publicTCPListener != nil {
netAddr = publicTCPListener.Addr()
}
if publicUDPConn != nil && netAddr == nil {
netAddr = publicUDPConn.LocalAddr()
}
return public{
tcpListener: wrapListener(publicTCPListener),
udpConn: publicUDPConn,
addr: netAddr,
drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
mux: publicMux,
disableTCPTLS: disableTCPTLS,
disableQUIC: disableQUIC,
}, nil
}
func (p public) Close() (err error) {
if p.quicListener != nil {
err = p.quicListener.Close()
}
if p.udpConn != nil {
err = errs.Combine(err, p.udpConn.Close())
}
if p.tcpListener != nil {
err = errs.Combine(err, p.tcpListener.Close())
}
return err
}
// isErrorAddressAlreadyInUse checks whether the error is corresponding to
// EADDRINUSE. Taken from https://stackoverflow.com/a/65865898.
func isErrorAddressAlreadyInUse(err error) bool {

View File

@ -62,7 +62,7 @@ func (planet *Planet) newReferralManager() (*server.Server, error) {
return nil, err
}
referralmanager, err := server.New(log, tlsOptions, config.Address, config.PrivateAddress)
referralmanager, err := server.New(log, tlsOptions, config)
if err != nil {
return nil, err
}

View File

@ -215,7 +215,7 @@ func TestDownloadFromUnresponsiveNode(t *testing.T) {
tlsOptions, err := tlsopts.NewOptions(storageNode.Identity, tlscfg, revocationDB)
require.NoError(t, err)
server, err := server.New(storageNode.Log.Named("mock-server"), tlsOptions, storageNode.Addr(), storageNode.PrivateAddr())
server, err := server.New(storageNode.Log.Named("mock-server"), tlsOptions, storageNode.Config.Server)
require.NoError(t, err)
err = pb.DRPCRegisterPiecestore(server.DRPC(), &piecestoreMock{})

View File

@ -230,7 +230,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}

View File

@ -356,7 +356,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}

View File

@ -76,7 +76,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
var group errgroup.Group
defer ctx.Check(group.Wait)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress)
contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)
@ -132,7 +132,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
var group errgroup.Group
defer ctx.Check(group.Wait)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress)
contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)