all: remove code to default to grpc
We have moved to drpc so we don't need to have code for building with grpc only. Change-Id: I55732314dca0d5b4ce1132b68de4186a15d91b21
This commit is contained in:
parent
62c58f4a9a
commit
ea455b6df0
24
Makefile
24
Makefile
@ -80,24 +80,18 @@ build-npm:
|
||||
|
||||
##@ Simulator
|
||||
|
||||
.PHONY: go-install-grpc-and-drpc
|
||||
go-install-grpc-and-drpc:
|
||||
@: $(if ${PACKAGE},,$(error PACKAGE must be defined for the go-install-grpc-and-drpc target))
|
||||
go build -race -v -tags=grpc -o "$(shell go list -f '{{.Target}}' ${PACKAGE})-grpc" "${PACKAGE}"
|
||||
go build -race -v -tags=drpc -o "$(shell go list -f '{{.Target}}' ${PACKAGE})-drpc" "${PACKAGE}"
|
||||
go install -race -v "${PACKAGE}"
|
||||
|
||||
.PHONY: install-sim
|
||||
install-sim: ## install storj-sim
|
||||
@echo "Running ${@}"
|
||||
$(MAKE) go-install-grpc-and-drpc PACKAGE=storj.io/storj/cmd/storagenode
|
||||
$(MAKE) go-install-grpc-and-drpc PACKAGE=storj.io/storj/cmd/satellite
|
||||
go install -race -v storj.io/storj/cmd/storj-sim
|
||||
go install -race -v storj.io/storj/cmd/versioncontrol
|
||||
go install -race -v storj.io/storj/cmd/uplink
|
||||
go install -race -v storj.io/storj/cmd/gateway
|
||||
go install -race -v storj.io/storj/cmd/identity
|
||||
go install -race -v storj.io/storj/cmd/certificates
|
||||
go install -race -v \
|
||||
storj.io/storj/cmd/satellite \
|
||||
storj.io/storj/cmd/storagenode \
|
||||
storj.io/storj/cmd/storj-sim \
|
||||
storj.io/storj/cmd/versioncontrol \
|
||||
storj.io/storj/cmd/uplink \
|
||||
storj.io/storj/cmd/gateway \
|
||||
storj.io/storj/cmd/identity \
|
||||
storj.io/storj/cmd/certificates
|
||||
|
||||
##@ Test
|
||||
|
||||
|
@ -1,8 +1,6 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build !grpc
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
@ -13,8 +13,15 @@ import (
|
||||
"storj.io/storj/private/memory"
|
||||
)
|
||||
|
||||
//go:generate go run gen.go ../pb drpc compat_drpc.go
|
||||
//go:generate go run gen.go ../pb grpc compat_grpc.go
|
||||
//go:generate go run gen.go ../pb drpc alias.go
|
||||
|
||||
const (
|
||||
// IsDRPC is true if drpc is being used.
|
||||
IsDRPC = true
|
||||
|
||||
// IsGRPC is true if grpc is being used.
|
||||
IsGRPC = false
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
|
@ -1,14 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build !grpc
|
||||
|
||||
package rpc
|
||||
|
||||
const (
|
||||
// IsDRPC is true if drpc is being used.
|
||||
IsDRPC = true
|
||||
|
||||
// IsGRPC is true if grpc is being used.
|
||||
IsGRPC = false
|
||||
)
|
@ -1,14 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build grpc
|
||||
|
||||
package rpc
|
||||
|
||||
const (
|
||||
// IsDRPC is true if drpc is being used.
|
||||
IsDRPC = false
|
||||
|
||||
// IsGRPC is true if grpc is being used.
|
||||
IsGRPC = true
|
||||
)
|
@ -1,225 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build grpc
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
// RawConn is a type alias to a grpc client connection
|
||||
type RawConn = grpc.ClientConn
|
||||
|
||||
type (
|
||||
// CertificatesClient is an alias to the grpc client interface
|
||||
CertificatesClient = pb.CertificatesClient
|
||||
|
||||
// ContactClient is an alias to the grpc client interface
|
||||
ContactClient = pb.ContactClient
|
||||
|
||||
// HealthInspectorClient is an alias to the grpc client interface
|
||||
HealthInspectorClient = pb.HealthInspectorClient
|
||||
|
||||
// IrreparableInspectorClient is an alias to the grpc client interface
|
||||
IrreparableInspectorClient = pb.IrreparableInspectorClient
|
||||
|
||||
// MetainfoClient is an alias to the grpc client interface
|
||||
MetainfoClient = pb.MetainfoClient
|
||||
|
||||
// NodeClient is an alias to the grpc client interface
|
||||
NodeClient = pb.NodeClient
|
||||
|
||||
// NodeGracefulExitClient is an alias to the grpc client interface
|
||||
NodeGracefulExitClient = pb.NodeGracefulExitClient
|
||||
|
||||
// NodeStatsClient is an alias to the grpc client interface
|
||||
NodeStatsClient = pb.NodeStatsClient
|
||||
|
||||
// OrdersClient is an alias to the grpc client interface
|
||||
OrdersClient = pb.OrdersClient
|
||||
|
||||
// OverlayInspectorClient is an alias to the grpc client interface
|
||||
OverlayInspectorClient = pb.OverlayInspectorClient
|
||||
|
||||
// PaymentsClient is an alias to the grpc client interface
|
||||
PaymentsClient = pb.PaymentsClient
|
||||
|
||||
// PieceStoreInspectorClient is an alias to the grpc client interface
|
||||
PieceStoreInspectorClient = pb.PieceStoreInspectorClient
|
||||
|
||||
// PiecestoreClient is an alias to the grpc client interface
|
||||
PiecestoreClient = pb.PiecestoreClient
|
||||
|
||||
// ReferralManagerClient is an alias to the grpc client interface
|
||||
ReferralManagerClient = pb.ReferralManagerClient
|
||||
|
||||
// SatelliteGracefulExitClient is an alias to the grpc client interface
|
||||
SatelliteGracefulExitClient = pb.SatelliteGracefulExitClient
|
||||
|
||||
// VouchersClient is an alias to the grpc client interface
|
||||
VouchersClient = pb.VouchersClient
|
||||
)
|
||||
|
||||
// NewCertificatesClient returns the grpc version of a CertificatesClient
|
||||
func NewCertificatesClient(rc *RawConn) CertificatesClient {
|
||||
return pb.NewCertificatesClient(rc)
|
||||
}
|
||||
|
||||
// CertificatesClient returns a CertificatesClient for this connection
|
||||
func (c *Conn) CertificatesClient() CertificatesClient {
|
||||
return NewCertificatesClient(c.raw)
|
||||
}
|
||||
|
||||
// NewContactClient returns the grpc version of a ContactClient
|
||||
func NewContactClient(rc *RawConn) ContactClient {
|
||||
return pb.NewContactClient(rc)
|
||||
}
|
||||
|
||||
// ContactClient returns a ContactClient for this connection
|
||||
func (c *Conn) ContactClient() ContactClient {
|
||||
return NewContactClient(c.raw)
|
||||
}
|
||||
|
||||
// NewHealthInspectorClient returns the grpc version of a HealthInspectorClient
|
||||
func NewHealthInspectorClient(rc *RawConn) HealthInspectorClient {
|
||||
return pb.NewHealthInspectorClient(rc)
|
||||
}
|
||||
|
||||
// HealthInspectorClient returns a HealthInspectorClient for this connection
|
||||
func (c *Conn) HealthInspectorClient() HealthInspectorClient {
|
||||
return NewHealthInspectorClient(c.raw)
|
||||
}
|
||||
|
||||
// NewIrreparableInspectorClient returns the grpc version of a IrreparableInspectorClient
|
||||
func NewIrreparableInspectorClient(rc *RawConn) IrreparableInspectorClient {
|
||||
return pb.NewIrreparableInspectorClient(rc)
|
||||
}
|
||||
|
||||
// IrreparableInspectorClient returns a IrreparableInspectorClient for this connection
|
||||
func (c *Conn) IrreparableInspectorClient() IrreparableInspectorClient {
|
||||
return NewIrreparableInspectorClient(c.raw)
|
||||
}
|
||||
|
||||
// NewMetainfoClient returns the grpc version of a MetainfoClient
|
||||
func NewMetainfoClient(rc *RawConn) MetainfoClient {
|
||||
return pb.NewMetainfoClient(rc)
|
||||
}
|
||||
|
||||
// MetainfoClient returns a MetainfoClient for this connection
|
||||
func (c *Conn) MetainfoClient() MetainfoClient {
|
||||
return NewMetainfoClient(c.raw)
|
||||
}
|
||||
|
||||
// NewNodeClient returns the grpc version of a NodeClient
|
||||
func NewNodeClient(rc *RawConn) NodeClient {
|
||||
return pb.NewNodeClient(rc)
|
||||
}
|
||||
|
||||
// NodeClient returns a NodeClient for this connection
|
||||
func (c *Conn) NodeClient() NodeClient {
|
||||
return NewNodeClient(c.raw)
|
||||
}
|
||||
|
||||
// NewNodeGracefulExitClient returns the grpc version of a NodeGracefulExitClient
|
||||
func NewNodeGracefulExitClient(rc *RawConn) NodeGracefulExitClient {
|
||||
return pb.NewNodeGracefulExitClient(rc)
|
||||
}
|
||||
|
||||
// NodeGracefulExitClient returns a NodeGracefulExitClient for this connection
|
||||
func (c *Conn) NodeGracefulExitClient() NodeGracefulExitClient {
|
||||
return NewNodeGracefulExitClient(c.raw)
|
||||
}
|
||||
|
||||
// NewNodeStatsClient returns the grpc version of a NodeStatsClient
|
||||
func NewNodeStatsClient(rc *RawConn) NodeStatsClient {
|
||||
return pb.NewNodeStatsClient(rc)
|
||||
}
|
||||
|
||||
// NodeStatsClient returns a NodeStatsClient for this connection
|
||||
func (c *Conn) NodeStatsClient() NodeStatsClient {
|
||||
return NewNodeStatsClient(c.raw)
|
||||
}
|
||||
|
||||
// NewOrdersClient returns the grpc version of a OrdersClient
|
||||
func NewOrdersClient(rc *RawConn) OrdersClient {
|
||||
return pb.NewOrdersClient(rc)
|
||||
}
|
||||
|
||||
// OrdersClient returns a OrdersClient for this connection
|
||||
func (c *Conn) OrdersClient() OrdersClient {
|
||||
return NewOrdersClient(c.raw)
|
||||
}
|
||||
|
||||
// NewOverlayInspectorClient returns the grpc version of a OverlayInspectorClient
|
||||
func NewOverlayInspectorClient(rc *RawConn) OverlayInspectorClient {
|
||||
return pb.NewOverlayInspectorClient(rc)
|
||||
}
|
||||
|
||||
// OverlayInspectorClient returns a OverlayInspectorClient for this connection
|
||||
func (c *Conn) OverlayInspectorClient() OverlayInspectorClient {
|
||||
return NewOverlayInspectorClient(c.raw)
|
||||
}
|
||||
|
||||
// NewPaymentsClient returns the grpc version of a PaymentsClient
|
||||
func NewPaymentsClient(rc *RawConn) PaymentsClient {
|
||||
return pb.NewPaymentsClient(rc)
|
||||
}
|
||||
|
||||
// PaymentsClient returns a PaymentsClient for this connection
|
||||
func (c *Conn) PaymentsClient() PaymentsClient {
|
||||
return NewPaymentsClient(c.raw)
|
||||
}
|
||||
|
||||
// NewPieceStoreInspectorClient returns the grpc version of a PieceStoreInspectorClient
|
||||
func NewPieceStoreInspectorClient(rc *RawConn) PieceStoreInspectorClient {
|
||||
return pb.NewPieceStoreInspectorClient(rc)
|
||||
}
|
||||
|
||||
// PieceStoreInspectorClient returns a PieceStoreInspectorClient for this connection
|
||||
func (c *Conn) PieceStoreInspectorClient() PieceStoreInspectorClient {
|
||||
return NewPieceStoreInspectorClient(c.raw)
|
||||
}
|
||||
|
||||
// NewPiecestoreClient returns the grpc version of a PiecestoreClient
|
||||
func NewPiecestoreClient(rc *RawConn) PiecestoreClient {
|
||||
return pb.NewPiecestoreClient(rc)
|
||||
}
|
||||
|
||||
// PiecestoreClient returns a PiecestoreClient for this connection
|
||||
func (c *Conn) PiecestoreClient() PiecestoreClient {
|
||||
return NewPiecestoreClient(c.raw)
|
||||
}
|
||||
|
||||
// NewReferralManagerClient returns the grpc version of a ReferralManagerClient
|
||||
func NewReferralManagerClient(rc *RawConn) ReferralManagerClient {
|
||||
return pb.NewReferralManagerClient(rc)
|
||||
}
|
||||
|
||||
// ReferralManagerClient returns a ReferralManagerClient for this connection
|
||||
func (c *Conn) ReferralManagerClient() ReferralManagerClient {
|
||||
return NewReferralManagerClient(c.raw)
|
||||
}
|
||||
|
||||
// NewSatelliteGracefulExitClient returns the grpc version of a SatelliteGracefulExitClient
|
||||
func NewSatelliteGracefulExitClient(rc *RawConn) SatelliteGracefulExitClient {
|
||||
return pb.NewSatelliteGracefulExitClient(rc)
|
||||
}
|
||||
|
||||
// SatelliteGracefulExitClient returns a SatelliteGracefulExitClient for this connection
|
||||
func (c *Conn) SatelliteGracefulExitClient() SatelliteGracefulExitClient {
|
||||
return NewSatelliteGracefulExitClient(c.raw)
|
||||
}
|
||||
|
||||
// NewVouchersClient returns the grpc version of a VouchersClient
|
||||
func NewVouchersClient(rc *RawConn) VouchersClient {
|
||||
return pb.NewVouchersClient(rc)
|
||||
}
|
||||
|
||||
// VouchersClient returns a VouchersClient for this connection
|
||||
func (c *Conn) VouchersClient() VouchersClient {
|
||||
return NewVouchersClient(c.raw)
|
||||
}
|
135
pkg/rpc/dial.go
135
pkg/rpc/dial.go
@ -5,12 +5,15 @@ package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcconn"
|
||||
"storj.io/drpc/drpcmanager"
|
||||
"storj.io/drpc/drpcstream"
|
||||
@ -200,3 +203,135 @@ func (d Dialer) DialAddressUnencrypted(ctx context.Context, address string) (_ *
|
||||
|
||||
return d.dialUnencrypted(ctx, address)
|
||||
}
|
||||
|
||||
// drpcHeader is the first bytes we send on a connection so that the remote
|
||||
// knows to expect drpc on the wire instead of grpc.
|
||||
const drpcHeader = "DRPC!!!1"
|
||||
|
||||
// dial performs the dialing to the drpc endpoint with tls.
|
||||
func (d Dialer) dial(ctx context.Context, address string, tlsConfig *tls.Config) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pool := rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) {
|
||||
return d.dialTransport(ctx, address, tlsConfig)
|
||||
})
|
||||
|
||||
conn, err := d.dialTransport(ctx, address, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state := conn.ConnectionState()
|
||||
|
||||
if err := pool.Put(drpcconn.New(conn)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Conn{
|
||||
state: state,
|
||||
raw: pool,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialTransport performs dialing to the drpc endpoint with tls.
|
||||
func (d Dialer) dialTransport(ctx context.Context, address string, tlsConfig *tls.Config) (_ *tlsConnWrapper, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// open the tcp socket to the address
|
||||
rawConn, err := d.dialContext(ctx, address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
rawConn = newDrpcHeaderConn(rawConn)
|
||||
|
||||
// perform the handshake racing with the context closing. we use a buffer
|
||||
// of size 1 so that the handshake can proceed even if no one is reading.
|
||||
errCh := make(chan error, 1)
|
||||
conn := tls.Client(rawConn, tlsConfig)
|
||||
go func() { errCh <- conn.Handshake() }()
|
||||
|
||||
// see which wins and close the raw conn if there was any error. we can't
|
||||
// close the tls connection concurrently with handshakes or it sometimes
|
||||
// will panic. cool, huh?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
case err = <-errCh:
|
||||
}
|
||||
if err != nil {
|
||||
_ = rawConn.Close()
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return &tlsConnWrapper{
|
||||
Conn: conn,
|
||||
underlying: rawConn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialUnencrypted performs dialing to the drpc endpoint with no tls.
|
||||
func (d Dialer) dialUnencrypted(ctx context.Context, address string) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return &Conn{
|
||||
raw: rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) {
|
||||
return d.dialTransportUnencrypted(ctx, address)
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialTransportUnencrypted performs dialing to the drpc endpoint with no tls.
|
||||
func (d Dialer) dialTransportUnencrypted(ctx context.Context, address string) (_ net.Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// open the tcp socket to the address
|
||||
conn, err := d.dialContext(ctx, address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return newDrpcHeaderConn(conn), nil
|
||||
}
|
||||
|
||||
// tlsConnWrapper is a wrapper around a *tls.Conn that calls Close on the
|
||||
// underlying connection when closed rather than trying to send a
|
||||
// notification to the other side which may block forever.
|
||||
type tlsConnWrapper struct {
|
||||
*tls.Conn
|
||||
underlying net.Conn
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (t *tlsConnWrapper) Close() error { return t.underlying.Close() }
|
||||
|
||||
// drpcHeaderConn fulfills the net.Conn interface. On the first call to Write
|
||||
// it will write the drpcHeader.
|
||||
type drpcHeaderConn struct {
|
||||
net.Conn
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// newDrpcHeaderConn returns a new *drpcHeaderConn
|
||||
func newDrpcHeaderConn(conn net.Conn) *drpcHeaderConn {
|
||||
return &drpcHeaderConn{
|
||||
Conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
// Write will write buf to the underlying conn. If this is the first time Write
|
||||
// is called it will prepend the drpcHeader to the beginning of the write.
|
||||
func (d *drpcHeaderConn) Write(buf []byte) (n int, err error) {
|
||||
var didOnce bool
|
||||
d.once.Do(func() {
|
||||
didOnce = true
|
||||
header := []byte(drpcHeader)
|
||||
n, err = d.Conn.Write(append(header, buf...))
|
||||
})
|
||||
if didOnce {
|
||||
n -= len(drpcHeader)
|
||||
if n < 0 {
|
||||
n = 0
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return d.Conn.Write(buf)
|
||||
}
|
||||
|
@ -1,149 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build !grpc
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcconn"
|
||||
"storj.io/storj/pkg/rpc/rpcpool"
|
||||
)
|
||||
|
||||
// drpcHeader is the first bytes we send on a connection so that the remote
|
||||
// knows to expect drpc on the wire instead of grpc.
|
||||
const drpcHeader = "DRPC!!!1"
|
||||
|
||||
// dial performs the dialing to the drpc endpoint with tls.
|
||||
func (d Dialer) dial(ctx context.Context, address string, tlsConfig *tls.Config) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pool := rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) {
|
||||
return d.dialTransport(ctx, address, tlsConfig)
|
||||
})
|
||||
|
||||
conn, err := d.dialTransport(ctx, address, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state := conn.ConnectionState()
|
||||
|
||||
if err := pool.Put(drpcconn.New(conn)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Conn{
|
||||
state: state,
|
||||
raw: pool,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialTransport performs dialing to the drpc endpoint with tls.
|
||||
func (d Dialer) dialTransport(ctx context.Context, address string, tlsConfig *tls.Config) (_ *tlsConnWrapper, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// open the tcp socket to the address
|
||||
rawConn, err := d.dialContext(ctx, address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
rawConn = newDrpcHeaderConn(rawConn)
|
||||
|
||||
// perform the handshake racing with the context closing. we use a buffer
|
||||
// of size 1 so that the handshake can proceed even if no one is reading.
|
||||
errCh := make(chan error, 1)
|
||||
conn := tls.Client(rawConn, tlsConfig)
|
||||
go func() { errCh <- conn.Handshake() }()
|
||||
|
||||
// see which wins and close the raw conn if there was any error. we can't
|
||||
// close the tls connection concurrently with handshakes or it sometimes
|
||||
// will panic. cool, huh?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
case err = <-errCh:
|
||||
}
|
||||
if err != nil {
|
||||
_ = rawConn.Close()
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return &tlsConnWrapper{
|
||||
Conn: conn,
|
||||
underlying: rawConn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialUnencrypted performs dialing to the drpc endpoint with no tls.
|
||||
func (d Dialer) dialUnencrypted(ctx context.Context, address string) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return &Conn{
|
||||
raw: rpcpool.New(d.PoolOptions, func(ctx context.Context) (drpc.Transport, error) {
|
||||
return d.dialTransportUnencrypted(ctx, address)
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialTransportUnencrypted performs dialing to the drpc endpoint with no tls.
|
||||
func (d Dialer) dialTransportUnencrypted(ctx context.Context, address string) (_ net.Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// open the tcp socket to the address
|
||||
conn, err := d.dialContext(ctx, address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return newDrpcHeaderConn(conn), nil
|
||||
}
|
||||
|
||||
// tlsConnWrapper is a wrapper around a *tls.Conn that calls Close on the
|
||||
// underlying connection when closed rather than trying to send a
|
||||
// notification to the other side which may block forever.
|
||||
type tlsConnWrapper struct {
|
||||
*tls.Conn
|
||||
underlying net.Conn
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (t *tlsConnWrapper) Close() error { return t.underlying.Close() }
|
||||
|
||||
// drpcHeaderConn fulfills the net.Conn interface. On the first call to Write
|
||||
// it will write the drpcHeader.
|
||||
type drpcHeaderConn struct {
|
||||
net.Conn
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// newDrpcHeaderConn returns a new *drpcHeaderConn
|
||||
func newDrpcHeaderConn(conn net.Conn) *drpcHeaderConn {
|
||||
return &drpcHeaderConn{
|
||||
Conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
// Write will write buf to the underlying conn. If this is the first time Write
|
||||
// is called it will prepend the drpcHeader to the beginning of the write.
|
||||
func (d *drpcHeaderConn) Write(buf []byte) (n int, err error) {
|
||||
var didOnce bool
|
||||
d.once.Do(func() {
|
||||
didOnce = true
|
||||
header := []byte(drpcHeader)
|
||||
n, err = d.Conn.Write(append(header, buf...))
|
||||
})
|
||||
if didOnce {
|
||||
n -= len(drpcHeader)
|
||||
if n < 0 {
|
||||
n = 0
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return d.Conn.Write(buf)
|
||||
}
|
@ -1,108 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build grpc
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
// dial performs the dialing to the grpc endpoint with tls.
|
||||
func (d Dialer) dial(ctx context.Context, address string, tlsConfig *tls.Config) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if d.DialTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, d.DialTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
creds := &captureStateCreds{TransportCredentials: credentials.NewTLS(tlsConfig)}
|
||||
conn, err := grpc.DialContext(ctx, address,
|
||||
grpc.WithTransportCredentials(creds),
|
||||
grpc.WithBlock(),
|
||||
grpc.FailOnNonTempDialError(true),
|
||||
grpc.WithContextDialer(d.dialContext))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
state, ok := creds.Get()
|
||||
if !ok {
|
||||
_ = conn.Close()
|
||||
return nil, Error.New("unable to get tls connection state when dialing")
|
||||
}
|
||||
|
||||
return &Conn{
|
||||
raw: conn,
|
||||
state: state,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// dialUnencrypted performs dialing to the grpc endpoint with no tls.
|
||||
func (d Dialer) dialUnencrypted(ctx context.Context, address string) (_ *Conn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if d.DialTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, d.DialTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
conn, err := grpc.DialContext(ctx, address,
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
grpc.FailOnNonTempDialError(true),
|
||||
grpc.WithContextDialer(d.dialContext))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return &Conn{raw: conn}, nil
|
||||
}
|
||||
|
||||
// captureStateCreds captures the tls connection state from a client/server handshake.
|
||||
type captureStateCreds struct {
|
||||
credentials.TransportCredentials
|
||||
once sync.Once
|
||||
state tls.ConnectionState
|
||||
ok bool
|
||||
}
|
||||
|
||||
// Get returns the stored tls connection state.
|
||||
func (c *captureStateCreds) Get() (state tls.ConnectionState, ok bool) {
|
||||
c.once.Do(func() {})
|
||||
return c.state, c.ok
|
||||
}
|
||||
|
||||
// ClientHandshake dispatches to the underlying credentials and tries to store the
|
||||
// connection state if possible.
|
||||
func (c *captureStateCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (
|
||||
net.Conn, credentials.AuthInfo, error) {
|
||||
|
||||
conn, auth, err := c.TransportCredentials.ClientHandshake(ctx, authority, rawConn)
|
||||
if tlsInfo, ok := auth.(credentials.TLSInfo); ok {
|
||||
c.once.Do(func() { c.state, c.ok = tlsInfo.State, true })
|
||||
}
|
||||
return conn, auth, err
|
||||
}
|
||||
|
||||
// ServerHandshake dispatches to the underlying credentials and tries to store the
|
||||
// connection state if possible.
|
||||
func (c *captureStateCreds) ServerHandshake(rawConn net.Conn) (
|
||||
net.Conn, credentials.AuthInfo, error) {
|
||||
|
||||
conn, auth, err := c.TransportCredentials.ServerHandshake(rawConn)
|
||||
if tlsInfo, ok := auth.(credentials.TLSInfo); ok {
|
||||
c.once.Do(func() { c.state, c.ok = tlsInfo.State, true })
|
||||
}
|
||||
return conn, auth, err
|
||||
}
|
@ -33,7 +33,7 @@ func main() {
|
||||
}
|
||||
|
||||
func usage() error {
|
||||
return errs.New("usage: %s <dir> <drpc|grpc> <output file>", os.Args[0])
|
||||
return errs.New("usage: %s <dir> drpc <output file>", os.Args[0])
|
||||
}
|
||||
|
||||
func run() error {
|
||||
@ -60,7 +60,6 @@ type generateInfo struct {
|
||||
Import string
|
||||
Prefix string
|
||||
Conn string
|
||||
Tag string
|
||||
}
|
||||
|
||||
var infos = map[string]generateInfo{
|
||||
@ -69,14 +68,6 @@ var infos = map[string]generateInfo{
|
||||
Import: "storj.io/storj/pkg/rpc/rpcpool",
|
||||
Prefix: "DRPC",
|
||||
Conn: "rpcpool.Conn",
|
||||
Tag: "!grpc",
|
||||
},
|
||||
"grpc": {
|
||||
Name: "grpc",
|
||||
Import: "google.golang.org/grpc", // the saddest newline
|
||||
Prefix: "",
|
||||
Conn: "grpc.ClientConn",
|
||||
Tag: "grpc",
|
||||
},
|
||||
}
|
||||
|
||||
@ -93,8 +84,6 @@ func generate(clients []string, info generateInfo, output string) (err error) {
|
||||
P("// Copyright (C) 2019 Storj Labs, Inc.")
|
||||
P("// See LICENSE for copying information.")
|
||||
P()
|
||||
P("// +build", info.Tag)
|
||||
P()
|
||||
P("package rpc")
|
||||
P()
|
||||
P("import (")
|
||||
|
@ -41,11 +41,7 @@ func (ctx *Context) Compile(pkg string, preArgs ...string) string {
|
||||
if raceEnabled {
|
||||
args = append(args, "-race")
|
||||
}
|
||||
if drpcEnabled {
|
||||
args = append(args, "-tags=drpc,unittest")
|
||||
} else {
|
||||
args = append(args, "-tag=unittest")
|
||||
}
|
||||
args = append(args, "-tags=unittest")
|
||||
args = append(args, "-o", exe, pkg)
|
||||
|
||||
cmd := exec.Command("go", args...)
|
||||
@ -82,9 +78,6 @@ func (ctx *Context) CompileShared(t *testing.T, name string, pkg string) Include
|
||||
base := ctx.File("build", name)
|
||||
|
||||
args := []string{"build", "-buildmode", "c-shared"}
|
||||
if drpcEnabled {
|
||||
args = append(args, "-tags=drpc")
|
||||
}
|
||||
args = append(args, "-o", base+".so", pkg)
|
||||
|
||||
// not using race detector for c-shared
|
||||
|
@ -1,8 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build !grpc
|
||||
|
||||
package testcontext
|
||||
|
||||
const drpcEnabled = true
|
@ -1,8 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
// +build grpc
|
||||
|
||||
package testcontext
|
||||
|
||||
const drpcEnabled = false
|
@ -67,11 +67,6 @@ install_sim(){
|
||||
local bin_dir="$1"
|
||||
mkdir -p ${bin_dir}
|
||||
|
||||
go build -race -v -tags=grpc -o ${bin_dir}/storagenode-grpc storj.io/storj/cmd/storagenode >/dev/null 2>&1
|
||||
go build -race -v -tags=drpc -o ${bin_dir}/storagenode-drpc storj.io/storj/cmd/storagenode >/dev/null 2>&1
|
||||
go build -race -v -tags=grpc -o ${bin_dir}/satellite-grpc storj.io/storj/cmd/satellite >/dev/null 2>&1
|
||||
go build -race -v -tags=drpc -o ${bin_dir}/satellite-drpc storj.io/storj/cmd/satellite >/dev/null 2>&1
|
||||
|
||||
go install -race -v -o ${bin_dir}/storagenode storj.io/storj/cmd/storagenode >/dev/null 2>&1
|
||||
go install -race -v -o ${bin_dir}/satellite storj.io/storj/cmd/satellite >/dev/null 2>&1
|
||||
go install -race -v -o ${bin_dir}/storj-sim storj.io/storj/cmd/storj-sim >/dev/null 2>&1
|
||||
|
@ -22,43 +22,18 @@ STORJ_SIM_POSTGRES=${STORJ_SIM_POSTGRES:-""}
|
||||
# setup the network
|
||||
# if postgres connection string is set as STORJ_SIM_POSTGRES then use that for testing
|
||||
if [ -z ${STORJ_SIM_POSTGRES} ]; then
|
||||
storj-sim -x --satellites 2 --host $STORJ_NETWORK_HOST4 network setup
|
||||
storj-sim -x --satellites 1 --host $STORJ_NETWORK_HOST4 network setup
|
||||
else
|
||||
storj-sim -x --satellites 2 --host $STORJ_NETWORK_HOST4 network --postgres=$STORJ_SIM_POSTGRES setup
|
||||
storj-sim -x --satellites 1 --host $STORJ_NETWORK_HOST4 network --postgres=$STORJ_SIM_POSTGRES setup
|
||||
fi
|
||||
|
||||
# explicitly set all the satellites and storagenodes to use mixed grpc and drpc
|
||||
(
|
||||
eval "$( storj-sim --satellites 2 network env )"
|
||||
|
||||
N=0
|
||||
DIR="SATELLITE_${N}_DIR"
|
||||
while [ -n "${!DIR:-""}" ]; do
|
||||
[ $((N%2)) -eq 0 ] && KIND=drpc || KIND=grpc
|
||||
BIN="$(which satellite-${KIND})"
|
||||
( set -x; cp "${BIN}" "${!DIR}/satellite" )
|
||||
let N=N+1
|
||||
DIR="SATELLITE_${N}_DIR"
|
||||
done
|
||||
|
||||
N=0
|
||||
DIR="STORAGENODE_${N}_DIR"
|
||||
while [ -n "${!DIR:-""}" ]; do
|
||||
[ $((N%2)) -eq 0 ] && KIND=drpc || KIND=grpc
|
||||
BIN="$(which storagenode-${KIND})"
|
||||
( set -x; cp "${BIN}" "${!DIR}/storagenode" )
|
||||
let N=N+1
|
||||
DIR="STORAGENODE_${N}_DIR"
|
||||
done
|
||||
)
|
||||
|
||||
# set the segment size lower to make test run faster
|
||||
echo client.segment-size: "6 MiB" >> `storj-sim network env GATEWAY_0_DIR`/config.yaml
|
||||
|
||||
# run aws-cli tests
|
||||
storj-sim -x --satellites 2 --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-sim-aws.sh
|
||||
storj-sim -x --satellites 2 --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-uplink.sh
|
||||
storj-sim -x --satellites 2 --host $STORJ_NETWORK_HOST4 network destroy
|
||||
storj-sim -x --satellites 1 --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-sim-aws.sh
|
||||
storj-sim -x --satellites 1 --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-uplink.sh
|
||||
storj-sim -x --satellites 1 --host $STORJ_NETWORK_HOST4 network destroy
|
||||
|
||||
# setup the network with ipv6
|
||||
#storj-sim -x --host "::1" network setup
|
||||
|
Loading…
Reference in New Issue
Block a user