pkg/rpc: remove read/write deadlines as a mechanism for request timeouts (#3335)
libuplink was incorrectly setting timeouts to 10 seconds still, but should have been at least 10 minutes. the order sender was setting them to 1 hour. we don't want timeouts in uplink-side logic as it establishes a minimum rate on tcp streams. instead of all of this, just use tcp keep alive. tcp keep alive packets are sent every 15 seconds and if the peer stops responding the connection dies. this is enabled by default with go. this will kill tcp connections when they stop working. Change-Id: I3d7ad49f71950b3eb43044eedf4b17993116045b
This commit is contained in:
parent
3e0d12354a
commit
2c6fa3c5f8
@ -251,7 +251,6 @@ func (flags *GatewayFlags) newUplink(ctx context.Context) (*libuplink.Uplink, er
|
|||||||
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !flags.TLS.UsePeerCAWhitelist
|
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !flags.TLS.UsePeerCAWhitelist
|
||||||
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = flags.TLS.PeerCAWhitelistPath
|
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = flags.TLS.PeerCAWhitelistPath
|
||||||
libuplinkCfg.Volatile.DialTimeout = flags.Client.DialTimeout
|
libuplinkCfg.Volatile.DialTimeout = flags.Client.DialTimeout
|
||||||
libuplinkCfg.Volatile.RequestTimeout = flags.Client.RequestTimeout
|
|
||||||
|
|
||||||
return libuplink.NewUplink(ctx, libuplinkCfg)
|
return libuplink.NewUplink(ctx, libuplinkCfg)
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,6 @@ func (cliCfg *UplinkFlags) NewUplink(ctx context.Context) (*libuplink.Uplink, er
|
|||||||
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !cliCfg.TLS.UsePeerCAWhitelist
|
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !cliCfg.TLS.UsePeerCAWhitelist
|
||||||
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = cliCfg.TLS.PeerCAWhitelistPath
|
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = cliCfg.TLS.PeerCAWhitelistPath
|
||||||
libuplinkCfg.Volatile.DialTimeout = cliCfg.Client.DialTimeout
|
libuplinkCfg.Volatile.DialTimeout = cliCfg.Client.DialTimeout
|
||||||
libuplinkCfg.Volatile.RequestTimeout = cliCfg.Client.RequestTimeout
|
|
||||||
|
|
||||||
return libuplink.NewUplink(ctx, libuplinkCfg)
|
return libuplink.NewUplink(ctx, libuplinkCfg)
|
||||||
}
|
}
|
||||||
|
@ -363,7 +363,6 @@ func (client *Uplink) GetConfig(satellite *SatelliteSystem) uplink.Config {
|
|||||||
config.Legacy.Client.APIKey = apiKey.Serialize()
|
config.Legacy.Client.APIKey = apiKey.Serialize()
|
||||||
config.Legacy.Client.SatelliteAddr = satellite.Addr()
|
config.Legacy.Client.SatelliteAddr = satellite.Addr()
|
||||||
|
|
||||||
config.Client.RequestTimeout = 10 * time.Second
|
|
||||||
config.Client.DialTimeout = 10 * time.Second
|
config.Client.DialTimeout = 10 * time.Second
|
||||||
|
|
||||||
config.RS.MinThreshold = atLeastOne(client.StorageNodeCount * 1 / 5) // 20% of storage nodes
|
config.RS.MinThreshold = atLeastOne(client.StorageNodeCount * 1 / 5) // 20% of storage nodes
|
||||||
@ -403,7 +402,6 @@ func (client *Uplink) NewLibuplink(ctx context.Context) (*libuplink.Uplink, erro
|
|||||||
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !config.TLS.UsePeerCAWhitelist
|
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !config.TLS.UsePeerCAWhitelist
|
||||||
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = config.TLS.PeerCAWhitelistPath
|
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = config.TLS.PeerCAWhitelistPath
|
||||||
libuplinkCfg.Volatile.DialTimeout = config.Client.DialTimeout
|
libuplinkCfg.Volatile.DialTimeout = config.Client.DialTimeout
|
||||||
libuplinkCfg.Volatile.RequestTimeout = config.Client.RequestTimeout
|
|
||||||
|
|
||||||
return libuplink.NewUplink(ctx, libuplinkCfg)
|
return libuplink.NewUplink(ctx, libuplinkCfg)
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const defaultUplinkDialTimeout = 20 * time.Second
|
const defaultUplinkDialTimeout = 20 * time.Second
|
||||||
const defaultUplinkRequestTimeout = 20 * time.Second
|
|
||||||
|
|
||||||
// Config represents configuration options for an Uplink
|
// Config represents configuration options for an Uplink
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -70,10 +69,6 @@ type Config struct {
|
|||||||
// DialTimeout is the maximum time to wait connecting to another node.
|
// DialTimeout is the maximum time to wait connecting to another node.
|
||||||
// If not set, the library default (20 seconds) will be used.
|
// If not set, the library default (20 seconds) will be used.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
|
|
||||||
// RequestTimeout is the maximum time to wait for a request response from another node.
|
|
||||||
// If not set, the library default (20 seconds) will be used.
|
|
||||||
RequestTimeout time.Duration
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,9 +92,6 @@ func (cfg *Config) setDefaults(ctx context.Context) error {
|
|||||||
if cfg.Volatile.DialTimeout.Seconds() == 0 {
|
if cfg.Volatile.DialTimeout.Seconds() == 0 {
|
||||||
cfg.Volatile.DialTimeout = defaultUplinkDialTimeout
|
cfg.Volatile.DialTimeout = defaultUplinkDialTimeout
|
||||||
}
|
}
|
||||||
if cfg.Volatile.RequestTimeout.Seconds() == 0 {
|
|
||||||
cfg.Volatile.RequestTimeout = defaultUplinkRequestTimeout
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +137,6 @@ func NewUplink(ctx context.Context, cfg *Config) (_ *Uplink, err error) {
|
|||||||
|
|
||||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||||
dialer.DialTimeout = cfg.Volatile.DialTimeout
|
dialer.DialTimeout = cfg.Volatile.DialTimeout
|
||||||
dialer.RequestTimeout = cfg.Volatile.RequestTimeout
|
|
||||||
|
|
||||||
return &Uplink{
|
return &Uplink{
|
||||||
ident: ident,
|
ident: ident,
|
||||||
|
@ -24,11 +24,9 @@ func TestUplinkConfigDefaultTimeouts(t *testing.T) {
|
|||||||
|
|
||||||
// Assert the lib uplink configuration gets the correct defaults applied.
|
// Assert the lib uplink configuration gets the correct defaults applied.
|
||||||
assert.Equal(t, 20*time.Second, client.cfg.Volatile.DialTimeout)
|
assert.Equal(t, 20*time.Second, client.cfg.Volatile.DialTimeout)
|
||||||
assert.Equal(t, 20*time.Second, client.cfg.Volatile.RequestTimeout)
|
|
||||||
|
|
||||||
// Assert the values propagate correctly all the way down to the transport layer.
|
// Assert the values propagate correctly all the way down to the transport layer.
|
||||||
assert.Equal(t, 20*time.Second, client.dialer.DialTimeout)
|
assert.Equal(t, 20*time.Second, client.dialer.DialTimeout)
|
||||||
assert.Equal(t, 20*time.Second, client.dialer.RequestTimeout)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestUplinkConfigSetTimeouts tests that the uplink configuration settings properly override
|
// TestUplinkConfigSetTimeouts tests that the uplink configuration settings properly override
|
||||||
@ -38,7 +36,6 @@ func TestUplinkConfigSetTimeouts(t *testing.T) {
|
|||||||
|
|
||||||
cfg := &Config{}
|
cfg := &Config{}
|
||||||
cfg.Volatile.DialTimeout = 120 * time.Second
|
cfg.Volatile.DialTimeout = 120 * time.Second
|
||||||
cfg.Volatile.RequestTimeout = 120 * time.Second
|
|
||||||
cfg.Volatile.TLS = struct {
|
cfg.Volatile.TLS = struct {
|
||||||
SkipPeerCAWhitelist bool
|
SkipPeerCAWhitelist bool
|
||||||
PeerCAWhitelistPath string
|
PeerCAWhitelistPath string
|
||||||
@ -54,9 +51,7 @@ func TestUplinkConfigSetTimeouts(t *testing.T) {
|
|||||||
|
|
||||||
// Assert the lib uplink configuration gets the correct values applied.
|
// Assert the lib uplink configuration gets the correct values applied.
|
||||||
assert.Equal(t, 120*time.Second, client.cfg.Volatile.DialTimeout)
|
assert.Equal(t, 120*time.Second, client.cfg.Volatile.DialTimeout)
|
||||||
assert.Equal(t, 120*time.Second, client.cfg.Volatile.RequestTimeout)
|
|
||||||
|
|
||||||
// Assert the values propagate correctly all the way down to the transport layer.
|
// Assert the values propagate correctly all the way down to the transport layer.
|
||||||
assert.Equal(t, 120*time.Second, client.dialer.DialTimeout)
|
assert.Equal(t, 120*time.Second, client.dialer.DialTimeout)
|
||||||
assert.Equal(t, 120*time.Second, client.dialer.RequestTimeout)
|
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,7 @@ var Error = errs.Class("rpccompat")
|
|||||||
// ignored.
|
// ignored.
|
||||||
type timedConn struct {
|
type timedConn struct {
|
||||||
net.Conn
|
net.Conn
|
||||||
timeout time.Duration
|
rate memory.Size
|
||||||
rate memory.Size
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// now returns time.Now if there's a nonzero rate.
|
// now returns time.Now if there's a nonzero rate.
|
||||||
@ -49,28 +48,16 @@ func (t *timedConn) delay(start time.Time, n int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read wraps the connection read setting the timeout and sleeping to ensure the rate.
|
// Read wraps the connection read and adds sleeping to ensure the rate.
|
||||||
func (t *timedConn) Read(p []byte) (int, error) {
|
func (t *timedConn) Read(p []byte) (int, error) {
|
||||||
if t.timeout > 0 {
|
|
||||||
if err := t.SetReadDeadline(time.Now().Add(t.timeout)); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
start := t.now()
|
start := t.now()
|
||||||
n, err := t.Conn.Read(p)
|
n, err := t.Conn.Read(p)
|
||||||
t.delay(start, n)
|
t.delay(start, n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write wraps the connection write setting the timeout and sleeping to ensure the rate.
|
// Write wraps the connection write and adds sleeping to ensure the rate.
|
||||||
func (t *timedConn) Write(p []byte) (int, error) {
|
func (t *timedConn) Write(p []byte) (int, error) {
|
||||||
if t.timeout > 0 {
|
|
||||||
if err := t.SetWriteDeadline(time.Now().Add(t.timeout)); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
start := t.now()
|
start := t.now()
|
||||||
n, err := t.Conn.Write(p)
|
n, err := t.Conn.Write(p)
|
||||||
t.delay(start, n)
|
t.delay(start, n)
|
||||||
|
@ -23,10 +23,6 @@ type Dialer struct {
|
|||||||
// insecure connections can be made.
|
// insecure connections can be made.
|
||||||
TLSOptions *tlsopts.Options
|
TLSOptions *tlsopts.Options
|
||||||
|
|
||||||
// RequestTimeout causes any read/write operations on the raw socket
|
|
||||||
// to error if they take longer than it if it is non-zero.
|
|
||||||
RequestTimeout time.Duration
|
|
||||||
|
|
||||||
// DialTimeout causes all the tcp dials to error if they take longer
|
// DialTimeout causes all the tcp dials to error if they take longer
|
||||||
// than it if it is non-zero.
|
// than it if it is non-zero.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
@ -43,9 +39,8 @@ type Dialer struct {
|
|||||||
// NewDefaultDialer returns a Dialer with default timeouts set.
|
// NewDefaultDialer returns a Dialer with default timeouts set.
|
||||||
func NewDefaultDialer(tlsOptions *tlsopts.Options) Dialer {
|
func NewDefaultDialer(tlsOptions *tlsopts.Options) Dialer {
|
||||||
return Dialer{
|
return Dialer{
|
||||||
TLSOptions: tlsOptions,
|
TLSOptions: tlsOptions,
|
||||||
RequestTimeout: 10 * time.Minute,
|
DialTimeout: 20 * time.Second,
|
||||||
DialTimeout: 20 * time.Second,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,9 +81,8 @@ func (d Dialer) dialContext(ctx context.Context, address string) (net.Conn, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &timedConn{
|
return &timedConn{
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
timeout: d.RequestTimeout,
|
rate: d.TransferRate,
|
||||||
rate: d.TransferRate,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,7 +435,6 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||||
dialer.RequestTimeout = 0
|
|
||||||
dialer.DialTimeout = 20 * time.Millisecond
|
dialer.DialTimeout = 20 * time.Millisecond
|
||||||
dialer.DialLatency = 200 * time.Second
|
dialer.DialLatency = 200 * time.Second
|
||||||
dialer.TransferRate = 1 * memory.KB
|
dialer.TransferRate = 1 * memory.KB
|
||||||
|
@ -82,12 +82,11 @@ type DB interface {
|
|||||||
|
|
||||||
// Config defines configuration for sending orders.
|
// Config defines configuration for sending orders.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
SenderInterval time.Duration `help:"duration between sending" default:"1h0m0s"`
|
SenderInterval time.Duration `help:"duration between sending" default:"1h0m0s"`
|
||||||
SenderTimeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
|
SenderTimeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
|
||||||
SenderDialTimeout time.Duration `help:"timeout for dialing satellite during sending orders" default:"1m0s"`
|
SenderDialTimeout time.Duration `help:"timeout for dialing satellite during sending orders" default:"1m0s"`
|
||||||
SenderRequestTimeout time.Duration `help:"timeout for read/write operations during sending" default:"1h0m0s"`
|
CleanupInterval time.Duration `help:"duration between archive cleanups" default:"24h0m0s"`
|
||||||
CleanupInterval time.Duration `help:"duration between archive cleanups" default:"24h0m0s"`
|
ArchiveTTL time.Duration `help:"length of time to archive orders before deletion" default:"168h0m0s"` // 7 days
|
||||||
ArchiveTTL time.Duration `help:"length of time to archive orders before deletion" default:"168h0m0s"` // 7 days
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service sends every interval unsent orders to the satellite.
|
// Service sends every interval unsent orders to the satellite.
|
||||||
|
@ -302,7 +302,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
|||||||
|
|
||||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||||
dialer.DialTimeout = config.Storage2.Orders.SenderDialTimeout
|
dialer.DialTimeout = config.Storage2.Orders.SenderDialTimeout
|
||||||
dialer.RequestTimeout = config.Storage2.Orders.SenderRequestTimeout
|
|
||||||
|
|
||||||
peer.Storage2.Orders = orders.NewService(
|
peer.Storage2.Orders = orders.NewService(
|
||||||
log.Named("orders"),
|
log.Named("orders"),
|
||||||
|
@ -40,10 +40,9 @@ type EncryptionConfig struct {
|
|||||||
// ClientConfig is a configuration struct for the uplink that controls how
|
// ClientConfig is a configuration struct for the uplink that controls how
|
||||||
// to talk to the rest of the network.
|
// to talk to the rest of the network.
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
MaxInlineSize memory.Size `help:"max inline segment size in bytes" default:"4KiB"`
|
MaxInlineSize memory.Size `help:"max inline segment size in bytes" default:"4KiB"`
|
||||||
SegmentSize memory.Size `help:"the size of a segment in bytes" default:"64MiB"`
|
SegmentSize memory.Size `help:"the size of a segment in bytes" default:"64MiB"`
|
||||||
RequestTimeout time.Duration `help:"timeout for request" default:"0h2m00s"`
|
DialTimeout time.Duration `help:"timeout for dials" default:"0h2m00s"`
|
||||||
DialTimeout time.Duration `help:"timeout for dials" default:"0h2m00s"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config uplink configuration
|
// Config uplink configuration
|
||||||
|
Loading…
Reference in New Issue
Block a user