2019-09-19 04:34:19 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package rpc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"net"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"storj.io/storj/internal/memory"
|
|
|
|
"storj.io/storj/pkg/pb"
|
|
|
|
"storj.io/storj/pkg/peertls/tlsopts"
|
|
|
|
"storj.io/storj/pkg/storj"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Dialer holds configuration for dialing.
|
|
|
|
type Dialer struct {
|
|
|
|
// TLSOptions controls the tls options for dialing. If it is nil, only
|
|
|
|
// insecure connections can be made.
|
|
|
|
TLSOptions *tlsopts.Options
|
|
|
|
|
|
|
|
// RequestTimeout causes any read/write operations on the raw socket
|
|
|
|
// to error if they take longer than it if it is non-zero.
|
|
|
|
RequestTimeout time.Duration
|
|
|
|
|
|
|
|
// DialTimeout causes all the tcp dials to error if they take longer
|
|
|
|
// than it if it is non-zero.
|
|
|
|
DialTimeout time.Duration
|
|
|
|
|
|
|
|
// DialLatency sleeps this amount if it is non-zero before every dial.
|
|
|
|
// The timeout runs while the sleep is happening.
|
|
|
|
DialLatency time.Duration
|
|
|
|
|
|
|
|
// TransferRate limits all read/write operations to go slower than
|
|
|
|
// the size per second if it is non-zero.
|
|
|
|
TransferRate memory.Size
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewDefaultDialer returns a Dialer with default timeouts set.
|
|
|
|
func NewDefaultDialer(tlsOptions *tlsopts.Options) Dialer {
|
|
|
|
return Dialer{
|
|
|
|
TLSOptions: tlsOptions,
|
|
|
|
RequestTimeout: 10 * time.Minute,
|
|
|
|
DialTimeout: 20 * time.Second,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// dialContext does a raw tcp dial to the address and wraps the connection with the
|
|
|
|
// provided timeout.
|
|
|
|
func (d Dialer) dialContext(ctx context.Context, address string) (net.Conn, error) {
|
|
|
|
if d.DialTimeout > 0 {
|
|
|
|
var cancel func()
|
|
|
|
ctx, cancel = context.WithTimeout(ctx, d.DialTimeout)
|
|
|
|
defer cancel()
|
|
|
|
}
|
|
|
|
|
|
|
|
if d.DialLatency > 0 {
|
|
|
|
timer := time.NewTimer(d.DialLatency)
|
|
|
|
select {
|
|
|
|
case <-timer.C:
|
|
|
|
case <-ctx.Done():
|
|
|
|
timer.Stop()
|
|
|
|
return nil, Error.Wrap(ctx.Err())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
conn, err := new(net.Dialer).DialContext(ctx, "tcp", address)
|
|
|
|
if err != nil {
|
2019-09-19 05:46:39 +01:00
|
|
|
// N.B. this error is not wrapped on purpose! grpc code cares about inspecting
|
2019-10-04 17:59:49 +01:00
|
|
|
// it and it's not smart enough to attempt to do any unwrapping. :( Additionally
|
|
|
|
// DialContext does not return an error that can be inspected easily to see if it
|
|
|
|
// came from the context being canceled. Thus, we do this racy thing where if the
|
|
|
|
// context is canceled at this point, we return it, rather than return the error
|
|
|
|
// from dialing. It's a slight lie, but arguably still correct because the cancel
|
|
|
|
// must be racing with the dial anyway.
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, ctx.Err()
|
|
|
|
default:
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-09-19 04:34:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return &timedConn{
|
|
|
|
Conn: conn,
|
|
|
|
timeout: d.RequestTimeout,
|
|
|
|
rate: d.TransferRate,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DialNode creates an rpc connection to the specified node.
|
|
|
|
func (d Dialer) DialNode(ctx context.Context, node *pb.Node) (_ *Conn, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if d.TLSOptions == nil {
|
|
|
|
return nil, Error.New("tls options not set when required for this dial")
|
|
|
|
}
|
|
|
|
|
|
|
|
return d.dial(ctx, node.GetAddress().GetAddress(), d.TLSOptions.ClientTLSConfig(node.Id))
|
|
|
|
}
|
|
|
|
|
|
|
|
// DialAddressID dials to the specified address and asserts it has the given node id.
|
|
|
|
func (d Dialer) DialAddressID(ctx context.Context, address string, id storj.NodeID) (_ *Conn, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if d.TLSOptions == nil {
|
|
|
|
return nil, Error.New("tls options not set when required for this dial")
|
|
|
|
}
|
|
|
|
|
|
|
|
return d.dial(ctx, address, d.TLSOptions.ClientTLSConfig(id))
|
|
|
|
}
|
|
|
|
|
|
|
|
// DialAddressInsecure dials to the specified address and does not check the node id.
|
|
|
|
func (d Dialer) DialAddressInsecure(ctx context.Context, address string) (_ *Conn, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if d.TLSOptions == nil {
|
|
|
|
return nil, Error.New("tls options not set when required for this dial")
|
|
|
|
}
|
|
|
|
|
|
|
|
return d.dial(ctx, address, d.TLSOptions.UnverifiedClientTLSConfig())
|
|
|
|
}
|
|
|
|
|
|
|
|
// DialAddressUnencrypted dials to the specified address without tls.
|
|
|
|
func (d Dialer) DialAddressUnencrypted(ctx context.Context, address string) (_ *Conn, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
return d.dialInsecure(ctx, address)
|
|
|
|
}
|