2019-09-06 17:14:03 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
2019-09-19 05:46:39 +01:00
"storj.io/storj/pkg/rpc"
2019-09-06 17:14:03 +01:00
"storj.io/storj/storagenode/trust"
)
// Chore is the contact chore for nodes announcing themselves to their trusted satellites
2019-09-10 14:24:16 +01:00
//
// architecture: Chore
2019-09-06 17:14:03 +01:00
type Chore struct {
2019-09-19 05:46:39 +01:00
log * zap . Logger
service * Service
dialer rpc . Dialer
2019-09-06 17:14:03 +01:00
trust * trust . Pool
2019-10-11 21:44:18 +01:00
Loop * sync2 . Cycle
2019-09-06 17:14:03 +01:00
}
// NewChore creates a new contact chore
2019-10-11 21:44:18 +01:00
func NewChore ( log * zap . Logger , interval time . Duration , trust * trust . Pool , dialer rpc . Dialer , service * Service ) * Chore {
2019-09-06 17:14:03 +01:00
return & Chore {
2019-09-19 05:46:39 +01:00
log : log ,
service : service ,
dialer : dialer ,
2019-09-06 17:14:03 +01:00
trust : trust ,
2019-10-11 21:44:18 +01:00
Loop : sync2 . NewCycle ( interval ) ,
2019-09-06 17:14:03 +01:00
}
}
// Run the contact chore on a regular interval with jitter
func ( chore * Chore ) Run ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
chore . log . Info ( "Storagenode contact chore starting up" )
2019-10-28 14:04:31 +00:00
if ! chore . service . initialized . Wait ( ctx ) {
return ctx . Err ( )
2019-10-26 18:16:25 +01:00
}
2019-09-06 17:14:03 +01:00
return chore . Loop . Run ( ctx , func ( ctx context . Context ) error {
if err := chore . pingSatellites ( ctx ) ; err != nil {
chore . log . Error ( "pingSatellites failed" , zap . Error ( err ) )
}
return nil
} )
}
func ( chore * Chore ) pingSatellites ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
var group errgroup . Group
2019-09-19 20:56:34 +01:00
self := chore . service . Local ( )
2019-09-06 17:14:03 +01:00
satellites := chore . trust . GetSatellites ( ctx )
2019-11-01 15:20:53 +00:00
2019-09-06 17:14:03 +01:00
for _ , satellite := range satellites {
satellite := satellite
addr , err := chore . trust . GetAddress ( ctx , satellite )
if err != nil {
chore . log . Error ( "getting satellite address" , zap . Error ( err ) )
continue
}
group . Go ( func ( ) error {
2019-09-19 05:46:39 +01:00
conn , err := chore . dialer . DialAddressID ( ctx , addr , satellite )
2019-09-06 17:14:03 +01:00
if err != nil {
return err
}
2019-09-19 05:46:39 +01:00
defer func ( ) { err = errs . Combine ( err , conn . Close ( ) ) } ( )
2019-10-30 18:57:21 +00:00
resp , err := conn . NodeClient ( ) . CheckIn ( ctx , & pb . CheckInRequest {
2019-09-14 01:37:32 +01:00
Address : self . Address . GetAddress ( ) ,
2019-09-19 19:37:31 +01:00
Version : & self . Version ,
2019-09-10 17:05:07 +01:00
Capacity : & self . Capacity ,
Operator : & self . Operator ,
2019-09-06 17:14:03 +01:00
} )
2019-11-01 15:20:53 +00:00
if err != nil {
chore . log . Error ( "Check-In with satellite failed" , zap . Error ( err ) )
} else if ! resp . PingNodeSuccess {
2019-10-30 18:57:21 +00:00
chore . log . Error ( "Check-In with satellite failed due to failed ping back" , zap . String ( "satellite ID" , satellite . String ( ) ) , zap . String ( "satellite addr" , addr ) , zap . String ( "ping back error message" , resp . GetPingErrorMessage ( ) ) )
}
2019-09-06 17:14:03 +01:00
return err
} )
}
return group . Wait ( )
}
// Close stops the contact chore
func ( chore * Chore ) Close ( ) error {
chore . Loop . Close ( )
return nil
}