diff --git a/pkg/rpc/dial.go b/pkg/rpc/dial.go index 2c9d4122d..5af0cb7ce 100644 --- a/pkg/rpc/dial.go +++ b/pkg/rpc/dial.go @@ -16,6 +16,7 @@ import ( "storj.io/drpc/drpcstream" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/pkg/rpc/rpcpool" "storj.io/storj/pkg/storj" "storj.io/storj/private/memory" ) @@ -48,8 +49,8 @@ type Dialer struct { // the size per second if it is non-zero. TransferRate memory.Size - // PoolCapacity is the maximum number of cached connections to hold. - PoolCapacity int + // PoolOptions controls options for the connection pool. + PoolOptions rpcpool.Options // ConnectionOptions controls the options that we pass to drpc connections. ConnectionOptions drpcconn.Options @@ -58,9 +59,12 @@ type Dialer struct { // NewDefaultDialer returns a Dialer with default timeouts set. func NewDefaultDialer(tlsOptions *tlsopts.Options) Dialer { return Dialer{ - TLSOptions: tlsOptions, - DialTimeout: 20 * time.Second, - PoolCapacity: 5, + TLSOptions: tlsOptions, + DialTimeout: 20 * time.Second, + PoolOptions: rpcpool.Options{ + Capacity: 5, + IdleExpiration: 2 * time.Minute, + }, ConnectionOptions: drpcconn.Options{ Manager: NewDefaultManagerOptions(), }, diff --git a/pkg/rpc/dial_drpc.go b/pkg/rpc/dial_drpc.go index 0bea08eb9..7d0b206a8 100644 --- a/pkg/rpc/dial_drpc.go +++ b/pkg/rpc/dial_drpc.go @@ -23,7 +23,7 @@ const drpcHeader = "DRPC!!!1" func (d Dialer) dial(ctx context.Context, address string, tlsConfig *tls.Config) (_ *Conn, err error) { defer mon.Task()(&ctx)(&err) - pool := rpcpool.New(d.PoolCapacity, func(ctx context.Context) (drpc.Transport, error) { + pool := rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) { return d.dialTransport(ctx, address, tlsConfig) }) @@ -89,7 +89,7 @@ func (d Dialer) dialUnencrypted(ctx context.Context, address string) (_ *Conn, e defer mon.Task()(&ctx)(&err) return &Conn{ - raw: rpcpool.New(d.PoolCapacity, func(ctx context.Context) (drpc.Transport, error) { + raw: rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) { return d.dialTransportUnencrypted(ctx, address) }), }, nil diff --git a/pkg/rpc/rpcpool/pool.go b/pkg/rpc/rpcpool/pool.go index ac8a24c10..5333f9111 100644 --- a/pkg/rpc/rpcpool/pool.go +++ b/pkg/rpc/rpcpool/pool.go @@ -6,6 +6,7 @@ package rpcpool import ( "context" "sync" + "time" "github.com/zeebo/errs" "gopkg.in/spacemonkeygo/monkit.v2" @@ -16,6 +17,43 @@ import ( var mon = monkit.Package() +// NOTE(jeff): conn expiration could remove the connection from the pool so +// that it doesn't take up a slot causing us to throw away a connection that +// we may want to keep. that adds quite a bit of complexity because channels +// do not support removing buffered elements, so it didn't seem worth it. + +// expiringConn wraps a connection +type expiringConn struct { + conn *drpcconn.Conn + timer *time.Timer +} + +// newExpiringConn wraps the connection with a timer that will close it after the +// specified duration. If the duration is non-positive, no timer is set. +func newExpiringConn(conn *drpcconn.Conn, dur time.Duration) *expiringConn { + ex := &expiringConn{conn: conn} + if dur > 0 { + ex.timer = time.AfterFunc(dur, func() { _ = conn.Close() }) + } + return ex +} + +// Cancel attempts to cancel the expiration timer and returns true if the +// timer will not close the connection. +func (ex *expiringConn) Cancel() bool { + return ex.timer == nil || ex.timer.Stop() +} + +// Options controls the options for a connection pool. +type Options struct { + // Capacity is how many connections to keep open. + Capacity int + + // IdleExpiration is how long a connection in the pool is allowed to be + // kept idle. If zero, connections do not expire. + IdleExpiration time.Duration +} + // Error is the class of errors returned by this package. var Error = errs.Class("rpcpool") @@ -24,8 +62,9 @@ type Dialer = func(context.Context) (drpc.Transport, error) // Conn implements drpc.Conn but keeps a pool of connections open. type Conn struct { + opts Options mu sync.Mutex - pool chan *drpcconn.Conn + pool chan *expiringConn done chan struct{} dial Dialer } @@ -34,9 +73,10 @@ var _ drpc.Conn = (*Conn)(nil) // New returns a new Conn that will keep cap connections open using the provided // dialer when it needs new ones. -func New(cap int, dial Dialer) *Conn { +func New(opts Options, dial Dialer) *Conn { return &Conn{ - pool: make(chan *drpcconn.Conn, cap), + opts: opts, + pool: make(chan *expiringConn, opts.Capacity), done: make(chan struct{}), dial: dial, } @@ -44,7 +84,7 @@ func New(cap int, dial Dialer) *Conn { // Close closes all of the pool's connections and ensures no new ones will be made. func (c *Conn) Close() (err error) { - var pool chan *drpcconn.Conn + var pool chan *expiringConn // only one call will ever see a non-nil pool variable. additionally, anyone // holding the mutex will either see a nil c.pool or a non-closed c.pool. @@ -54,8 +94,10 @@ func (c *Conn) Close() (err error) { if pool != nil { close(pool) - for conn := range pool { - err = errs.Combine(err, conn.Close()) + for ex := range pool { + if ex.Cancel() { + err = errs.Combine(err, ex.conn.Close()) + } } close(c.done) } @@ -85,17 +127,17 @@ func (c *Conn) getConn(ctx context.Context) (_ *drpcconn.Conn, err error) { for { select { - case conn, ok := <-pool: + case ex, ok := <-pool: if !ok { return nil, Error.New("connection pool closed") } // if the connection died in the pool, try again - if conn.Closed() { + if !ex.Cancel() || ex.conn.Closed() { continue } - return conn, nil + return ex.conn, nil default: return c.newConn(ctx) } @@ -114,11 +156,15 @@ func (c *Conn) Put(conn *drpcconn.Conn) error { return nil } + ex := newExpiringConn(conn, c.opts.IdleExpiration) select { - case c.pool <- conn: + case c.pool <- ex: return nil default: - return conn.Close() + if ex.Cancel() { + return conn.Close() + } + return nil } }