pkg/rpc: Change drpcheader to save a packet
This changes when we write the drpcheader. Rather than making it its own write to the connection, it now prepends the drpc header to the first write on the connection (typically the tls handshake). This results in one less packet being sent at the beginning of each drpc connection. For an operation like uploading a file from uplink, this results in many packets being dropped: one when communicating with the satellite, and one for each communication with the storage nodes. Change-Id: I7644b46e90ffa7acea73ac56831396307352ed7a
This commit is contained in:
parent
2f7465c294
commit
0008aebf80
@ -9,6 +9,7 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcconn"
|
||||
@ -52,12 +53,7 @@ func (d Dialer) dialTransport(ctx context.Context, address string, tlsConfig *tl
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// write the header bytes before the tls handshake
|
||||
if _, err := rawConn.Write([]byte(drpcHeader)); err != nil {
|
||||
_ = rawConn.Close()
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
rawConn = newDrpcHeaderConn(rawConn)
|
||||
|
||||
// perform the handshake racing with the context closing. we use a buffer
|
||||
// of size 1 so that the handshake can proceed even if no one is reading.
|
||||
@ -80,7 +76,7 @@ func (d Dialer) dialTransport(ctx context.Context, address string, tlsConfig *tl
|
||||
|
||||
return &tlsConnWrapper{
|
||||
Conn: conn,
|
||||
Underlying: rawConn,
|
||||
underlying: rawConn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -105,13 +101,7 @@ func (d Dialer) dialTransportUnencrypted(ctx context.Context, address string) (_
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// write the header bytes before the tls handshake
|
||||
if _, err := conn.Write([]byte(drpcHeader)); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
return newDrpcHeaderConn(conn), nil
|
||||
}
|
||||
|
||||
// tlsConnWrapper is a wrapper around a *tls.Conn that calls Close on the
|
||||
@ -119,8 +109,41 @@ func (d Dialer) dialTransportUnencrypted(ctx context.Context, address string) (_
|
||||
// notification to the other side which may block forever.
|
||||
type tlsConnWrapper struct {
|
||||
*tls.Conn
|
||||
Underlying net.Conn
|
||||
underlying net.Conn
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (t *tlsConnWrapper) Close() error { return t.Underlying.Close() }
|
||||
func (t *tlsConnWrapper) Close() error { return t.underlying.Close() }
|
||||
|
||||
// drpcHeaderConn fulfills the net.Conn interface. On the first call to Write
|
||||
// it will write the drpcHeader.
|
||||
type drpcHeaderConn struct {
|
||||
net.Conn
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// newDrpcHeaderConn returns a new *drpcHeaderConn
|
||||
func newDrpcHeaderConn(conn net.Conn) *drpcHeaderConn {
|
||||
return &drpcHeaderConn{
|
||||
Conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
// Write will write buf to the underlying conn. If this is the first time Write
|
||||
// is called it will prepend the drpcHeader to the beginning of the write.
|
||||
func (d *drpcHeaderConn) Write(buf []byte) (n int, err error) {
|
||||
var didOnce bool
|
||||
d.once.Do(func() {
|
||||
didOnce = true
|
||||
header := []byte(drpcHeader)
|
||||
n, err = d.Conn.Write(append(header, buf...))
|
||||
})
|
||||
if didOnce {
|
||||
n -= len(drpcHeader)
|
||||
if n < 0 {
|
||||
n = 0
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return d.Conn.Write(buf)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user