satellite/contact,storagenode/contact: try ping back to nodes through
QUIC We want to encourage storagenodes to open their udp port. This PR changes contact service in satellite to try to connect to nodes through QUIC. If satellite can't reach nodes through quic, it will send an error message back to nodes. On the nodes side, it will always log out error message from check in if the error message is not empty. Whether satellite can reach nodes through quic has no affect on nodes' uptime check. Change-Id: I5ebf80f921c4a6504997d83c8bd45226da9d3703
This commit is contained in:
parent
f6ec7f9bfc
commit
a3c437a7bf
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/common/rpc/rpcpeer"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func TestSatelliteContactEndpoint(t *testing.T) {
|
||||
@ -42,6 +43,85 @@ func TestSatelliteContactEndpoint(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.True(t, resp.PingNodeSuccess)
|
||||
require.True(t, resp.PingNodeSuccessQuic)
|
||||
|
||||
peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ident.PeerIdentity(), peerID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSatelliteContactEndpoint_Failure(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
config.Server.DisableTCPTLS = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
|
||||
ident := planet.StorageNodes[0].Identity
|
||||
|
||||
peer := rpcpeer.Peer{
|
||||
Addr: &net.TCPAddr{
|
||||
IP: net.ParseIP(nodeInfo.Address),
|
||||
Port: 5,
|
||||
},
|
||||
State: tls.ConnectionState{
|
||||
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
|
||||
},
|
||||
}
|
||||
peerCtx := rpcpeer.NewContext(ctx, &peer)
|
||||
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
|
||||
Address: nodeInfo.Address,
|
||||
Version: &nodeInfo.Version,
|
||||
Capacity: &nodeInfo.Capacity,
|
||||
Operator: &nodeInfo.Operator,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.False(t, resp.PingNodeSuccess)
|
||||
require.False(t, resp.PingNodeSuccessQuic)
|
||||
|
||||
peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ident.PeerIdentity(), peerID)
|
||||
})
|
||||
}
|
||||
func TestSatelliteContactEndpoint_QUIC_Unreachable(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
config.Server.DisableQUIC = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
|
||||
ident := planet.StorageNodes[0].Identity
|
||||
|
||||
peer := rpcpeer.Peer{
|
||||
Addr: &net.TCPAddr{
|
||||
IP: net.ParseIP(nodeInfo.Address),
|
||||
Port: 5,
|
||||
},
|
||||
State: tls.ConnectionState{
|
||||
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
|
||||
},
|
||||
}
|
||||
peerCtx := rpcpeer.NewContext(ctx, &peer)
|
||||
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
|
||||
Address: nodeInfo.Address,
|
||||
Version: &nodeInfo.Version,
|
||||
Capacity: &nodeInfo.Capacity,
|
||||
Operator: &nodeInfo.Operator,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.True(t, resp.PingNodeSuccess)
|
||||
require.False(t, resp.PingNodeSuccessQuic)
|
||||
|
||||
peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
|
||||
require.NoError(t, err)
|
||||
|
@ -70,7 +70,7 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
|
||||
ID: nodeID,
|
||||
Address: req.Address,
|
||||
}
|
||||
pingNodeSuccess, pingErrorMessage, err := endpoint.service.PingBack(ctx, nodeurl)
|
||||
pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, err := endpoint.service.PingBack(ctx, nodeurl)
|
||||
if err != nil {
|
||||
endpoint.log.Info("failed to ping back address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
|
||||
if errPingBackDial.Has(err) {
|
||||
@ -115,8 +115,9 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
|
||||
|
||||
endpoint.log.Debug("checking in", zap.String("node addr", req.Address), zap.Bool("ping node success", pingNodeSuccess), zap.String("ping node err msg", pingErrorMessage))
|
||||
return &pb.CheckInResponse{
|
||||
PingNodeSuccess: pingNodeSuccess,
|
||||
PingErrorMessage: pingErrorMessage,
|
||||
PingNodeSuccess: pingNodeSuccess,
|
||||
PingNodeSuccessQuic: pingNodeSuccessQUIC,
|
||||
PingErrorMessage: pingErrorMessage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/pkg/quic"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -66,7 +67,7 @@ func (service *Service) Local() overlay.NodeDossier {
|
||||
func (service *Service) Close() error { return nil }
|
||||
|
||||
// PingBack pings the node to test connectivity.
|
||||
func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_ bool, _ string, err error) {
|
||||
func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_ bool, _ bool, _ string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if service.timeout > 0 {
|
||||
@ -77,6 +78,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
|
||||
|
||||
pingNodeSuccess := true
|
||||
var pingErrorMessage string
|
||||
var pingNodeSuccessQUIC bool
|
||||
|
||||
client, err := dialNodeURL(ctx, service.dialer, nodeurl)
|
||||
if err != nil {
|
||||
@ -91,7 +93,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
|
||||
service.log.Debug("pingBack failed to dial storage node",
|
||||
zap.String("pingErrorMessage", pingErrorMessage),
|
||||
)
|
||||
return pingNodeSuccess, pingErrorMessage, nil
|
||||
return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
|
||||
}
|
||||
defer func() { err = errs.Combine(err, client.Close()) }()
|
||||
|
||||
@ -104,7 +106,39 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
|
||||
zap.Stringer("Node ID", nodeurl.ID),
|
||||
zap.String("pingErrorMessage", pingErrorMessage),
|
||||
)
|
||||
|
||||
return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
|
||||
}
|
||||
|
||||
return pingNodeSuccess, pingErrorMessage, nil
|
||||
pingNodeSuccessQUIC = true
|
||||
err = service.pingNodeQUIC(ctx, nodeurl)
|
||||
if err != nil {
|
||||
// udp ping back is optional right now, it shouldn't affect contact service's
|
||||
// control flow
|
||||
pingNodeSuccessQUIC = false
|
||||
pingErrorMessage = err.Error()
|
||||
}
|
||||
|
||||
return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
|
||||
}
|
||||
|
||||
func (service *Service) pingNodeQUIC(ctx context.Context, nodeurl storj.NodeURL) error {
|
||||
udpDialer := service.dialer
|
||||
udpDialer.Connector = quic.NewDefaultConnector(nil)
|
||||
udpClient, err := dialNodeURL(ctx, udpDialer, nodeurl)
|
||||
if err != nil {
|
||||
mon.Event("failed_dial_quic")
|
||||
return Error.New("failed to dial storage node (ID: %s) at address %s using QUIC: %q", nodeurl.ID.String(), nodeurl.Address, err)
|
||||
}
|
||||
defer func() {
|
||||
_ = udpClient.Close()
|
||||
}()
|
||||
|
||||
_, err = udpClient.pingNode(ctx, &pb.ContactPingRequest{})
|
||||
if err != nil {
|
||||
mon.Event("failed_ping_node_quic")
|
||||
return Error.New("failed to ping storage node using QUIC, your node indicated error code: %d, %q", rpcstatus.Code(err), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -140,6 +140,9 @@ func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID)
|
||||
if resp != nil && !resp.PingNodeSuccess {
|
||||
return errPingSatellite.New("%s", resp.PingErrorMessage)
|
||||
}
|
||||
if resp.PingErrorMessage != "" {
|
||||
service.log.Warn("Your node is still considered to be online but encountered an error.", zap.Stringer("Satellite ID", id), zap.String("Error", resp.GetPingErrorMessage()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user