satellite/{contact,downtime,overlay}: use NodeURL
Change-Id: I555a479a89e0ddbf0499898bdbc8574282cd6846
This commit is contained in:
parent
941d10cbc3
commit
5d016425f1
@ -16,12 +16,9 @@ type client struct {
|
||||
client pb.DRPCContactClient
|
||||
}
|
||||
|
||||
// dialNode dials the target contact endpoint
|
||||
func dialNode(ctx context.Context, dialer rpc.Dialer, address string, id storj.NodeID) (*client, error) {
|
||||
conn, err := dialer.DialNodeURL(ctx, storj.NodeURL{
|
||||
ID: id,
|
||||
Address: address,
|
||||
})
|
||||
// dialNodeURL dials the target contact endpoint
|
||||
func dialNodeURL(ctx context.Context, dialer rpc.Dialer, nodeurl storj.NodeURL) (*client, error) {
|
||||
conn, err := dialer.DialNodeURL(ctx, nodeurl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"storj.io/common/identity"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -63,7 +64,11 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errCheckInNetwork.New("failed to resolve IP from address: %s, err: %v", req.Address, err).Error())
|
||||
}
|
||||
|
||||
pingNodeSuccess, pingErrorMessage, err := endpoint.service.PingBack(ctx, req.Address, nodeID)
|
||||
nodeurl := storj.NodeURL{
|
||||
ID: nodeID,
|
||||
Address: req.Address,
|
||||
}
|
||||
pingNodeSuccess, pingErrorMessage, err := endpoint.service.PingBack(ctx, nodeurl)
|
||||
if err != nil {
|
||||
endpoint.log.Info("failed to ping back address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
|
||||
if errPingBackDial.Has(err) {
|
||||
|
@ -66,7 +66,7 @@ func (service *Service) Local() overlay.NodeDossier {
|
||||
func (service *Service) Close() error { return nil }
|
||||
|
||||
// PingBack pings the node to test connectivity.
|
||||
func (service *Service) PingBack(ctx context.Context, address string, peerID storj.NodeID) (_ bool, _ string, err error) {
|
||||
func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_ bool, _ string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if service.timeout > 0 {
|
||||
@ -78,7 +78,7 @@ func (service *Service) PingBack(ctx context.Context, address string, peerID sto
|
||||
pingNodeSuccess := true
|
||||
var pingErrorMessage string
|
||||
|
||||
client, err := dialNode(ctx, service.dialer, address, peerID)
|
||||
client, err := dialNodeURL(ctx, service.dialer, nodeurl)
|
||||
if err != nil {
|
||||
// If there is an error from trying to dial and ping the node, return that error as
|
||||
// pingErrorMessage and not as the err. We want to use this info to update
|
||||
@ -86,7 +86,7 @@ func (service *Service) PingBack(ctx context.Context, address string, peerID sto
|
||||
mon.Event("failed dial")
|
||||
pingNodeSuccess = false
|
||||
pingErrorMessage = fmt.Sprintf("failed to dial storage node (ID: %s) at address %s: %q",
|
||||
peerID, address, err,
|
||||
nodeurl.ID, nodeurl.Address, err,
|
||||
)
|
||||
service.log.Debug("pingBack failed to dial storage node",
|
||||
zap.String("pingErrorMessage", pingErrorMessage),
|
||||
@ -101,7 +101,7 @@ func (service *Service) PingBack(ctx context.Context, address string, peerID sto
|
||||
pingNodeSuccess = false
|
||||
pingErrorMessage = fmt.Sprintf("failed to ping storage node, your node indicated error code: %d, %q", rpcstatus.Code(err), err)
|
||||
service.log.Debug("pingBack pingNode error",
|
||||
zap.Stringer("Node ID", peerID),
|
||||
zap.Stringer("Node ID", nodeurl.ID),
|
||||
zap.String("pingErrorMessage", pingErrorMessage),
|
||||
)
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func (chore *DetectionChore) Run(ctx context.Context) (err error) {
|
||||
zap.Int("count", len(nodeLastContacts)))
|
||||
|
||||
for _, nodeLastContact := range nodeLastContacts {
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, nodeLastContact.ID, nodeLastContact.Address)
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, nodeLastContact.URL)
|
||||
if err != nil {
|
||||
chore.log.Error("error during downtime detection ping back.",
|
||||
zap.Bool("success", success),
|
||||
|
@ -62,7 +62,7 @@ func (chore *EstimationChore) Run(ctx context.Context) (err error) {
|
||||
for _, node := range offlineNodes {
|
||||
node := node
|
||||
chore.limiter.Go(ctx, func() {
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, node.ID, node.Address)
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, node.URL)
|
||||
if err != nil {
|
||||
chore.log.Error("error during downtime estimation ping back",
|
||||
zap.Bool("success", success),
|
||||
@ -73,10 +73,10 @@ func (chore *EstimationChore) Run(ctx context.Context) (err error) {
|
||||
now := time.Now().UTC()
|
||||
duration := now.Sub(node.LastContactFailure)
|
||||
|
||||
err = chore.db.Add(ctx, node.ID, now, duration)
|
||||
err = chore.db.Add(ctx, node.URL.ID, now, duration)
|
||||
if err != nil {
|
||||
chore.log.Error("error adding node seconds offline information.",
|
||||
zap.Stringer("node ID", node.ID),
|
||||
zap.Stringer("node ID", node.URL.ID),
|
||||
zap.Stringer("duration", duration),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
@ -33,10 +33,10 @@ func NewService(log *zap.Logger, overlay *overlay.Service, contact *contact.Serv
|
||||
}
|
||||
|
||||
// CheckAndUpdateNodeAvailability tries to ping the supplied address and updates the uptime based on ping success or failure. Returns true if the ping and uptime updates are successful.
|
||||
func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, nodeID storj.NodeID, address string) (success bool, err error) {
|
||||
func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, nodeurl storj.NodeURL) (success bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pingNodeSuccess, pingErrorMessage, err := service.contact.PingBack(ctx, address, nodeID)
|
||||
pingNodeSuccess, pingErrorMessage, err := service.contact.PingBack(ctx, nodeurl)
|
||||
if err != nil {
|
||||
service.log.Error("error during downtime detection ping back.",
|
||||
zap.String("ping error", pingErrorMessage),
|
||||
@ -46,10 +46,10 @@ func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, node
|
||||
}
|
||||
|
||||
if pingNodeSuccess {
|
||||
_, err = service.overlay.UpdateUptime(ctx, nodeID, true)
|
||||
_, err = service.overlay.UpdateUptime(ctx, nodeurl.ID, true)
|
||||
if err != nil {
|
||||
service.log.Error("error updating node contact success information.",
|
||||
zap.Stringer("node ID", nodeID),
|
||||
zap.Stringer("node ID", nodeurl.ID),
|
||||
zap.Error(err))
|
||||
|
||||
return false, errs.Wrap(err)
|
||||
@ -58,10 +58,10 @@ func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, node
|
||||
return true, nil
|
||||
}
|
||||
|
||||
_, err = service.overlay.UpdateUptime(ctx, nodeID, false)
|
||||
_, err = service.overlay.UpdateUptime(ctx, nodeurl.ID, false)
|
||||
if err != nil {
|
||||
service.log.Error("error updating node contact failure information.",
|
||||
zap.Stringer("node ID", nodeID),
|
||||
zap.Stringer("node ID", nodeurl.ID),
|
||||
zap.Error(err))
|
||||
|
||||
return false, errs.Wrap(err)
|
||||
|
@ -18,7 +18,6 @@ func TestCheckNodeAvailability(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
node := planet.StorageNodes[0]
|
||||
nodeDossier := planet.StorageNodes[0].Contact.Service.Local()
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
node.Contact.Chore.Pause(ctx)
|
||||
@ -31,7 +30,7 @@ func TestCheckNodeAvailability(t *testing.T) {
|
||||
require.True(t, dossier.Reputation.LastContactSuccess.Before(beforeSuccessfulCheck))
|
||||
require.True(t, dossier.Reputation.LastContactFailure.Before(beforeSuccessfulCheck))
|
||||
|
||||
success, err := satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, nodeDossier.Id, nodeDossier.Address.GetAddress())
|
||||
success, err := satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, node.NodeURL())
|
||||
require.NoError(t, err)
|
||||
require.True(t, success)
|
||||
|
||||
@ -51,7 +50,7 @@ func TestCheckNodeAvailability(t *testing.T) {
|
||||
|
||||
// now test that CheckAndUpdateNodeAvailability updated with a failure, and the last contact success is the same
|
||||
beforeFailedCheck := time.Now()
|
||||
success, err = satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, nodeDossier.Id, nodeDossier.Address.GetAddress())
|
||||
success, err = satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, node.NodeURL())
|
||||
require.NoError(t, err)
|
||||
require.False(t, success)
|
||||
|
||||
|
@ -210,8 +210,7 @@ type NodeStats struct {
|
||||
|
||||
// NodeLastContact contains the ID, address, and timestamp
|
||||
type NodeLastContact struct {
|
||||
ID storj.NodeID
|
||||
Address string
|
||||
URL storj.NodeURL
|
||||
LastIPPort string
|
||||
LastContactSuccess time.Time
|
||||
LastContactFailure time.Time
|
||||
|
@ -682,18 +682,18 @@ func TestCache_DowntimeTracking(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 4)
|
||||
// order of nodes should be least recently checked first
|
||||
require.Equal(t, allIDs[2], nodes[0].ID)
|
||||
require.Equal(t, allIDs[4], nodes[1].ID)
|
||||
require.Equal(t, allIDs[6], nodes[2].ID)
|
||||
require.Equal(t, allIDs[8], nodes[3].ID)
|
||||
require.Equal(t, allIDs[2], nodes[0].URL.ID)
|
||||
require.Equal(t, allIDs[4], nodes[1].URL.ID)
|
||||
require.Equal(t, allIDs[6], nodes[2].URL.ID)
|
||||
require.Equal(t, allIDs[8], nodes[3].URL.ID)
|
||||
|
||||
// test with limit
|
||||
nodes, err = cache.GetOfflineNodesLimited(ctx, 2)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 2)
|
||||
// order of nodes should be least recently checked first
|
||||
require.Equal(t, allIDs[2], nodes[0].ID)
|
||||
require.Equal(t, allIDs[4], nodes[1].ID)
|
||||
require.Equal(t, allIDs[2], nodes[0].URL.ID)
|
||||
require.Equal(t, allIDs[4], nodes[1].URL.ID)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -826,8 +826,7 @@ func (cache *overlaycache) GetSuccesfulNodesNotCheckedInSince(ctx context.Contex
|
||||
}
|
||||
|
||||
nodeLastContact := overlay.NodeLastContact{
|
||||
ID: nodeID,
|
||||
Address: node.Address,
|
||||
URL: storj.NodeURL{ID: nodeID, Address: node.Address},
|
||||
LastContactSuccess: node.LastContactSuccess.UTC(),
|
||||
LastContactFailure: node.LastContactFailure.UTC(),
|
||||
}
|
||||
@ -874,8 +873,7 @@ func (cache *overlaycache) GetOfflineNodesLimited(ctx context.Context, limit int
|
||||
}
|
||||
|
||||
nodeLastContact := overlay.NodeLastContact{
|
||||
ID: nodeID,
|
||||
Address: node.Address,
|
||||
URL: storj.NodeURL{ID: nodeID, Address: node.Address},
|
||||
LastContactSuccess: node.LastContactSuccess.UTC(),
|
||||
LastContactFailure: node.LastContactFailure.UTC(),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user