storj/pkg/rpc/common.go
JT Olio 2c6fa3c5f8
pkg/rpc: remove read/write deadlines as a mechanism for request timeouts (#3335)
libuplink was incorrectly setting timeouts to 10 seconds still, but
should have been at least 10 minutes. the order sender was setting them
to 1 hour. we don't want timeouts in uplink-side logic as it establishes
a minimum rate on tcp streams.

instead of all of this, just use tcp keep alive. tcp keep alive packets are
sent every 15 seconds and if the peer stops responding the connection
dies. this is enabled by default with go. this will kill tcp connections
when they stop working.

Change-Id: I3d7ad49f71950b3eb43044eedf4b17993116045b
2019-10-22 17:57:24 -06:00

66 lines
1.6 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package rpc
import (
"net"
"time"
"github.com/zeebo/errs"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/memory"
)
//go:generate go run gen.go ../pb drpc compat_drpc.go
//go:generate go run gen.go ../pb grpc compat_grpc.go
var mon = monkit.Package()
// Error wraps all of the errors returned by this package.
var Error = errs.Class("rpccompat")
// timedConn wraps a net.Conn so that all reads and writes get the specified timeout and
// return bytes no faster than the rate. If the timeout or rate are zero, they are
// ignored.
type timedConn struct {
net.Conn
rate memory.Size
}
// now returns time.Now if there's a nonzero rate.
func (t *timedConn) now() (now time.Time) {
if t.rate > 0 {
now = time.Now()
}
return now
}
// delay ensures that we sleep to keep the rate if it is nonzero. n is the number of
// bytes in the read or write operation we need to delay.
func (t *timedConn) delay(start time.Time, n int) {
if t.rate > 0 {
expected := time.Duration(n * int(time.Second) / t.rate.Int())
if actual := time.Since(start); expected > actual {
time.Sleep(expected - actual)
}
}
}
// Read wraps the connection read and adds sleeping to ensure the rate.
func (t *timedConn) Read(p []byte) (int, error) {
start := t.now()
n, err := t.Conn.Read(p)
t.delay(start, n)
return n, err
}
// Write wraps the connection write and adds sleeping to ensure the rate.
func (t *timedConn) Write(p []byte) (int, error) {
start := t.now()
n, err := t.Conn.Write(p)
t.delay(start, n)
return n, err
}