2019-09-19 20:56:34 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
2020-02-26 02:39:44 +00:00
"context"
2022-01-12 14:34:32 +00:00
"math/rand"
2019-09-19 20:56:34 +01:00
"sync"
"time"
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-09-19 20:56:34 +01:00
"github.com/zeebo/errs"
"go.uber.org/zap"
2020-02-26 02:39:44 +00:00
"golang.org/x/sync/errgroup"
2019-09-19 20:56:34 +01:00
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
2020-02-26 02:39:44 +00:00
"storj.io/common/rpc"
"storj.io/common/storj"
2019-12-27 11:48:47 +00:00
"storj.io/common/sync2"
2020-02-26 02:39:44 +00:00
"storj.io/storj/storagenode/trust"
2019-09-19 20:56:34 +01:00
)
2020-02-26 02:39:44 +00:00
var (
mon = monkit . Package ( )
2019-09-19 20:56:34 +01:00
2020-08-11 15:50:01 +01:00
// Error is the default error class for contact package.
2020-02-26 02:39:44 +00:00
Error = errs . Class ( "contact" )
2021-04-28 09:06:17 +01:00
errPingSatellite = errs . Class ( "ping satellite" )
2020-02-26 02:39:44 +00:00
)
const initialBackOff = time . Second
2019-09-19 20:56:34 +01:00
2020-07-16 15:18:02 +01:00
// Config contains configurable values for contact service.
2019-09-19 20:56:34 +01:00
type Config struct {
ExternalAddress string ` user:"true" help:"the public address of the node, useful for nodes behind NAT" default:"" `
// Chore config values
2019-11-04 21:20:31 +00:00
Interval time . Duration ` help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s" `
2019-09-19 20:56:34 +01:00
}
2020-05-20 14:40:25 +01:00
// NodeInfo contains information necessary for introducing storagenode to satellite.
type NodeInfo struct {
ID storj . NodeID
Address string
Version pb . NodeVersion
Capacity pb . NodeCapacity
Operator pb . NodeOperator
}
2020-07-16 15:18:02 +01:00
// Service is the contact service between storage nodes and satellites.
2019-09-19 20:56:34 +01:00
type Service struct {
2020-02-26 02:39:44 +00:00
log * zap . Logger
2022-01-12 14:34:32 +00:00
rand * rand . Rand
2020-02-26 02:39:44 +00:00
dialer rpc . Dialer
2019-09-19 20:56:34 +01:00
2019-09-27 16:47:57 +01:00
mu sync . Mutex
2020-05-20 14:40:25 +01:00
self NodeInfo
2019-10-26 18:16:25 +01:00
2022-01-25 10:51:40 +00:00
trust * trust . Pool
quicStats * QUICStats
2020-02-26 02:39:44 +00:00
2019-10-28 14:04:31 +00:00
initialized sync2 . Fence
2019-09-19 20:56:34 +01:00
}
2020-07-16 15:18:02 +01:00
// NewService creates a new contact service.
2022-01-25 10:51:40 +00:00
func NewService ( log * zap . Logger , dialer rpc . Dialer , self NodeInfo , trust * trust . Pool , quicStats * QUICStats ) * Service {
2019-09-19 20:56:34 +01:00
return & Service {
2022-01-25 10:51:40 +00:00
log : log ,
rand : rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) ,
dialer : dialer ,
trust : trust ,
self : self ,
quicStats : quicStats ,
2020-02-26 02:39:44 +00:00
}
}
2020-07-16 15:18:02 +01:00
// PingSatellites attempts to ping all satellites in trusted list until backoff reaches maxInterval.
2020-04-01 22:25:17 +01:00
func ( service * Service ) PingSatellites ( ctx context . Context , maxInterval time . Duration ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-02-26 02:39:44 +00:00
satellites := service . trust . GetSatellites ( ctx )
var group errgroup . Group
for _ , satellite := range satellites {
satellite := satellite
group . Go ( func ( ) error {
return service . pingSatellite ( ctx , satellite , maxInterval )
} )
}
return group . Wait ( )
}
func ( service * Service ) pingSatellite ( ctx context . Context , satellite storj . NodeID , maxInterval time . Duration ) error {
interval := initialBackOff
attempts := 0
for {
2020-10-13 13:13:41 +01:00
mon . Meter ( "satellite_contact_request" ) . Mark ( 1 ) //mon:locked
2020-02-26 02:39:44 +00:00
err := service . pingSatelliteOnce ( ctx , satellite )
attempts ++
if err == nil {
return nil
}
service . log . Error ( "ping satellite failed " , zap . Stringer ( "Satellite ID" , satellite ) , zap . Int ( "attempts" , attempts ) , zap . Error ( err ) )
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if ! sync2 . Sleep ( ctx , interval ) {
service . log . Info ( "context cancelled" , zap . Stringer ( "Satellite ID" , satellite ) )
return nil
}
interval *= 2
if interval >= maxInterval {
service . log . Info ( "retries timed out for this cycle" , zap . Stringer ( "Satellite ID" , satellite ) )
return nil
}
}
}
func ( service * Service ) pingSatelliteOnce ( ctx context . Context , id storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx , id ) ( & err )
2022-01-12 14:34:32 +00:00
conn , err := service . dialSatellite ( ctx , id )
2020-02-26 02:39:44 +00:00
if err != nil {
return errPingSatellite . Wrap ( err )
}
defer func ( ) { err = errs . Combine ( err , conn . Close ( ) ) } ( )
2020-05-19 17:11:30 +01:00
self := service . Local ( )
2020-11-13 00:37:59 +00:00
resp , err := pb . NewDRPCNodeClient ( conn ) . CheckIn ( ctx , & pb . CheckInRequest {
2020-05-20 14:40:25 +01:00
Address : self . Address ,
2020-02-26 02:39:44 +00:00
Version : & self . Version ,
Capacity : & self . Capacity ,
Operator : & self . Operator ,
} )
2022-01-25 10:51:40 +00:00
service . quicStats . SetStatus ( false )
2020-02-26 02:39:44 +00:00
if err != nil {
return errPingSatellite . Wrap ( err )
2019-09-19 20:56:34 +01:00
}
2022-01-25 10:51:40 +00:00
if resp != nil {
service . quicStats . SetStatus ( resp . PingNodeSuccessQuic )
if ! resp . PingNodeSuccess {
return errPingSatellite . New ( "%s" , resp . PingErrorMessage )
}
2020-11-13 00:37:59 +00:00
}
2021-03-24 18:30:27 +00:00
if resp . PingErrorMessage != "" {
service . log . Warn ( "Your node is still considered to be online but encountered an error." , zap . Stringer ( "Satellite ID" , id ) , zap . String ( "Error" , resp . GetPingErrorMessage ( ) ) )
}
2020-02-26 02:39:44 +00:00
return nil
2019-09-19 20:56:34 +01:00
}
2022-01-12 14:34:32 +00:00
// RequestPingMeQUIC sends pings request to satellite for a pingBack via QUIC.
2022-01-25 10:51:40 +00:00
func ( service * Service ) RequestPingMeQUIC ( ctx context . Context ) ( stats * QUICStats , err error ) {
2022-01-12 14:34:32 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2022-01-25 10:51:40 +00:00
stats = NewQUICStats ( true )
2022-01-12 14:34:32 +00:00
satellites := service . trust . GetSatellites ( ctx )
if len ( satellites ) < 1 {
2022-01-25 10:51:40 +00:00
return nil , errPingSatellite . New ( "no trusted satellite available" )
2022-01-12 14:34:32 +00:00
}
// Shuffle the satellites
// All the Storagenodes get a default list of trusted satellites (The Storj DCS ones) and
// most of the SN operators don't change the list, hence if it always starts with
// the same satellite we are going to put always more pressure on the first trusted
// satellite on the list. So we iterate over the list of trusted satellites in a
// random order to avoid putting pressure on the first trusted on the list
service . rand . Shuffle ( len ( satellites ) , func ( i , j int ) {
satellites [ i ] , satellites [ j ] = satellites [ j ] , satellites [ i ]
} )
for _ , satellite := range satellites {
err = service . requestPingMeOnce ( ctx , satellite )
if err != nil {
2022-01-25 10:51:40 +00:00
stats . SetStatus ( false )
2022-01-12 14:34:32 +00:00
// log warning and try the next trusted satellite
service . log . Warn ( "failed PingMe request to satellite" , zap . Stringer ( "Satellite ID" , satellite ) , zap . Error ( err ) )
continue
}
2022-01-25 10:51:40 +00:00
stats . SetStatus ( true )
return stats , nil
2022-01-12 14:34:32 +00:00
}
2022-01-25 10:51:40 +00:00
return stats , errPingSatellite . New ( "failed to ping storage node using QUIC: %q" , err )
2022-01-12 14:34:32 +00:00
}
func ( service * Service ) requestPingMeOnce ( ctx context . Context , satellite storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx , satellite ) ( & err )
conn , err := service . dialSatellite ( ctx , satellite )
if err != nil {
return errPingSatellite . Wrap ( err )
}
defer func ( ) { err = errs . Combine ( err , conn . Close ( ) ) } ( )
node := service . Local ( )
_ , err = pb . NewDRPCNodeClient ( conn ) . PingMe ( ctx , & pb . PingMeRequest {
Address : node . Address ,
2023-01-24 15:59:47 +00:00
Transport : pb . NodeTransport_QUIC_RPC ,
2022-01-12 14:34:32 +00:00
} )
if err != nil {
return errPingSatellite . Wrap ( err )
}
return nil
}
func ( service * Service ) dialSatellite ( ctx context . Context , id storj . NodeID ) ( * rpc . Conn , error ) {
nodeurl , err := service . trust . GetNodeURL ( ctx , id )
if err != nil {
return nil , errPingSatellite . Wrap ( err )
}
return service . dialer . DialNodeURL ( ctx , nodeurl )
}
2020-05-20 14:40:25 +01:00
// Local returns the storagenode info.
func ( service * Service ) Local ( ) NodeInfo {
2019-09-27 16:47:57 +01:00
service . mu . Lock ( )
defer service . mu . Unlock ( )
2020-05-20 14:40:25 +01:00
return service . self
2019-09-19 20:56:34 +01:00
}
2020-07-16 15:18:02 +01:00
// UpdateSelf updates the local node with the capacity.
2019-09-19 20:56:34 +01:00
func ( service * Service ) UpdateSelf ( capacity * pb . NodeCapacity ) {
2019-09-27 16:47:57 +01:00
service . mu . Lock ( )
defer service . mu . Unlock ( )
2019-09-19 20:56:34 +01:00
if capacity != nil {
service . self . Capacity = * capacity
}
2019-10-28 14:04:31 +00:00
service . initialized . Release ( )
2019-09-19 20:56:34 +01:00
}