storagenode: check if QUIC is properly configured
This change adds a check to confirm if UDP port if properly configured for QUIC Resolves https://github.com/storj/storj/issues/4332 Partly resolves https://github.com/storj/storj/issues/4358 Change-Id: I9a66f26a115e48b4fcd168f50a7d0b4d81712f4e
This commit is contained in:
parent
59648dc272
commit
8b40a071a0
@ -118,8 +118,8 @@ func (p *Server) DRPC() *drpcmux.Mux { return p.public.mux }
|
||||
// PrivateDRPC returns the server's dRPC mux for registration purposes.
|
||||
func (p *Server) PrivateDRPC() *drpcmux.Mux { return p.private.mux }
|
||||
|
||||
// IsQUICEnabled checks if QUIC is enabled.
|
||||
func (p *Server) IsQUICEnabled() bool { return !p.public.disableQUIC && p.public.quicListener != nil }
|
||||
// IsQUICEnabled checks if QUIC is enabled by config and udp port is open.
|
||||
func (p *Server) IsQUICEnabled() bool { return !p.public.disableQUIC && p.public.udpConn != nil }
|
||||
|
||||
// Close shuts down the server.
|
||||
func (p *Server) Close() error {
|
||||
|
@ -476,3 +476,8 @@ func (s *Service) VerifySatelliteID(ctx context.Context, satelliteID storj.NodeI
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetQUICEnabled sets QUIC status for the SNO dashboard.
|
||||
func (s *Service) SetQUICEnabled(enabled bool) {
|
||||
s.quicEnabled = enabled
|
||||
}
|
||||
|
@ -160,3 +160,15 @@ func TestLocalAndUpdateSelf(t *testing.T) {
|
||||
_ = group.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
func TestServiceRequestPingMeQUIC(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
node := planet.StorageNodes[0]
|
||||
node.Contact.Chore.Pause(ctx)
|
||||
|
||||
err := node.Contact.Service.RequestPingMeQUIC(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package contact
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -51,6 +52,7 @@ type NodeInfo struct {
|
||||
// Service is the contact service between storage nodes and satellites.
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
rand *rand.Rand
|
||||
dialer rpc.Dialer
|
||||
|
||||
mu sync.Mutex
|
||||
@ -65,6 +67,7 @@ type Service struct {
|
||||
func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
dialer: dialer,
|
||||
trust: trust,
|
||||
self: self,
|
||||
@ -116,12 +119,7 @@ func (service *Service) pingSatellite(ctx context.Context, satellite storj.NodeI
|
||||
func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
|
||||
defer mon.Task()(&ctx, id)(&err)
|
||||
|
||||
nodeurl, err := service.trust.GetNodeURL(ctx, id)
|
||||
if err != nil {
|
||||
return errPingSatellite.Wrap(err)
|
||||
}
|
||||
|
||||
conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
|
||||
conn, err := service.dialSatellite(ctx, id)
|
||||
if err != nil {
|
||||
return errPingSatellite.Wrap(err)
|
||||
}
|
||||
@ -146,6 +144,68 @@ func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequestPingMeQUIC sends pings request to satellite for a pingBack via QUIC.
|
||||
func (service *Service) RequestPingMeQUIC(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
satellites := service.trust.GetSatellites(ctx)
|
||||
if len(satellites) < 1 {
|
||||
return errPingSatellite.New("no trusted satellite available")
|
||||
}
|
||||
|
||||
// Shuffle the satellites
|
||||
// All the Storagenodes get a default list of trusted satellites (The Storj DCS ones) and
|
||||
// most of the SN operators don't change the list, hence if it always starts with
|
||||
// the same satellite we are going to put always more pressure on the first trusted
|
||||
// satellite on the list. So we iterate over the list of trusted satellites in a
|
||||
// random order to avoid putting pressure on the first trusted on the list
|
||||
service.rand.Shuffle(len(satellites), func(i, j int) {
|
||||
satellites[i], satellites[j] = satellites[j], satellites[i]
|
||||
})
|
||||
|
||||
for _, satellite := range satellites {
|
||||
err = service.requestPingMeOnce(ctx, satellite)
|
||||
if err != nil {
|
||||
// log warning and try the next trusted satellite
|
||||
service.log.Warn("failed PingMe request to satellite", zap.Stringer("Satellite ID", satellite), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return errPingSatellite.New("failed to ping storage node using QUIC: %q", err)
|
||||
}
|
||||
|
||||
func (service *Service) requestPingMeOnce(ctx context.Context, satellite storj.NodeID) (err error) {
|
||||
defer mon.Task()(&ctx, satellite)(&err)
|
||||
|
||||
conn, err := service.dialSatellite(ctx, satellite)
|
||||
if err != nil {
|
||||
return errPingSatellite.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, conn.Close()) }()
|
||||
|
||||
node := service.Local()
|
||||
_, err = pb.NewDRPCNodeClient(conn).PingMe(ctx, &pb.PingMeRequest{
|
||||
Address: node.Address,
|
||||
Transport: pb.NodeTransport_QUIC_GRPC,
|
||||
})
|
||||
if err != nil {
|
||||
return errPingSatellite.Wrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) dialSatellite(ctx context.Context, id storj.NodeID) (*rpc.Conn, error) {
|
||||
nodeurl, err := service.trust.GetNodeURL(ctx, id)
|
||||
if err != nil {
|
||||
return nil, errPingSatellite.Wrap(err)
|
||||
}
|
||||
|
||||
return service.dialer.DialNodeURL(ctx, nodeurl)
|
||||
}
|
||||
|
||||
// Local returns the storagenode info.
|
||||
func (service *Service) Local() NodeInfo {
|
||||
service.mu.Lock()
|
||||
|
@ -655,7 +655,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.Storage2.BlobsCache,
|
||||
config.Operator.WalletFeatures,
|
||||
port,
|
||||
peer.Server.IsQUICEnabled(),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
@ -680,11 +680,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.Payout.Service,
|
||||
peer.Console.Listener,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "console:endpoint",
|
||||
Run: peer.Console.Endpoint.Run,
|
||||
Close: peer.Console.Endpoint.Close,
|
||||
})
|
||||
// NOTE: Console service is added to peer services during peer run to allow for QUIC checkins
|
||||
}
|
||||
|
||||
{ // setup storage inspector
|
||||
@ -836,6 +832,32 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
// addConsoleService completes the SNO dashboard setup and adds the console service
|
||||
// to the peer services.
|
||||
func (peer *Peer) addConsoleService(ctx context.Context) {
|
||||
// perform QUIC checks
|
||||
quicEnabled := peer.Server.IsQUICEnabled()
|
||||
if quicEnabled {
|
||||
if err := peer.Contact.Service.RequestPingMeQUIC(ctx); err != nil {
|
||||
peer.Log.Warn("failed QUIC check", zap.Error(err))
|
||||
quicEnabled = false
|
||||
} else {
|
||||
peer.Log.Debug("QUIC check success")
|
||||
}
|
||||
} else {
|
||||
peer.Log.Warn("UDP Port not configured for QUIC")
|
||||
}
|
||||
|
||||
peer.Console.Service.SetQUICEnabled(quicEnabled)
|
||||
|
||||
// add console service to peer services
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "console:endpoint",
|
||||
Run: peer.Console.Endpoint.Run,
|
||||
Close: peer.Console.Endpoint.Close,
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs storage node until it's either closed or it errors.
|
||||
func (peer *Peer) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -854,6 +876,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
peer.Servers.Run(ctx, group)
|
||||
// complete SNO dashboard setup and add console service to peer services
|
||||
peer.addConsoleService(ctx)
|
||||
// run peer services
|
||||
peer.Services.Run(ctx, group)
|
||||
|
||||
return group.Wait()
|
||||
|
Loading…
Reference in New Issue
Block a user