satellite/contact: add timeout to PingBack method

Change-Id: I2ec2f82e2e10d8be16f82e9de13ce42358e47c98
This commit is contained in:
Cameron Ayer 2020-04-02 16:44:51 -04:00 committed by Stefan Benten
parent 9200efc61f
commit 42be4bdc0f
5 changed files with 21 additions and 4 deletions

View File

@ -310,6 +310,9 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
Admin: admin.Config{ Admin: admin.Config{
Address: "127.0.0.1:0", Address: "127.0.0.1:0",
}, },
Contact: contact.Config{
Timeout: 1 * time.Minute,
},
Overlay: overlay.Config{ Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{ Node: overlay.NodeSelectionConfig{
UptimeCount: 0, UptimeCount: 0,

View File

@ -289,7 +289,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
Type: pb.NodeType_SATELLITE, Type: pb.NodeType_SATELLITE,
Version: *pbVersion, Version: *pbVersion,
} }
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
pbgrpc.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) pbgrpc.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil { if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {

View File

@ -7,6 +7,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
@ -20,7 +21,8 @@ import (
// Config contains configurable values for contact service // Config contains configurable values for contact service
type Config struct { type Config struct {
ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""` ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""`
Timeout time.Duration `help:"timeout for pinging storage nodes" default:"10m0s"`
} }
// Service is the contact service between storage nodes and satellites. // Service is the contact service between storage nodes and satellites.
@ -37,16 +39,19 @@ type Service struct {
overlay *overlay.Service overlay *overlay.Service
peerIDs overlay.PeerIdentities peerIDs overlay.PeerIdentities
dialer rpc.Dialer dialer rpc.Dialer
timeout time.Duration
} }
// NewService creates a new contact service. // NewService creates a new contact service.
func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer) *Service { func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, timeout time.Duration) *Service {
return &Service{ return &Service{
log: log, log: log,
self: self, self: self,
overlay: overlay, overlay: overlay,
peerIDs: peerIDs, peerIDs: peerIDs,
dialer: dialer, dialer: dialer,
timeout: timeout,
} }
} }
@ -64,6 +69,12 @@ func (service *Service) Close() error { return nil }
func (service *Service) PingBack(ctx context.Context, address string, peerID storj.NodeID) (_ bool, _ string, err error) { func (service *Service) PingBack(ctx context.Context, address string, peerID storj.NodeID) (_ bool, _ string, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if service.timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, service.timeout)
defer cancel()
}
pingNodeSuccess := true pingNodeSuccess := true
var pingErrorMessage string var pingErrorMessage string

View File

@ -218,7 +218,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Type: pb.NodeType_SATELLITE, Type: pb.NodeType_SATELLITE,
Version: *pbVersion, Version: *pbVersion,
} }
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout)
peer.Services.Add(lifecycle.Item{ peer.Services.Add(lifecycle.Item{
Name: "contact:service", Name: "contact:service",
Close: peer.Contact.Service.Close, Close: peer.Contact.Service.Close,

View File

@ -106,6 +106,9 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0
# the public address of the node, useful for nodes behind NAT # the public address of the node, useful for nodes behind NAT
contact.external-address: "" contact.external-address: ""
# timeout for pinging storage nodes
# contact.timeout: 10m0s
# satellite database connection string # satellite database connection string
# database: postgres:// # database: postgres://