storj/storagenode/contact/chore.go

209 lines
5.2 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/trust"
)
// Chore is the contact chore for nodes announcing themselves to their trusted satellites
2019-09-10 14:24:16 +01:00
//
// architecture: Chore
type Chore struct {
log *zap.Logger
service *Service
dialer rpc.Dialer
trust *trust.Pool
mu sync.Mutex
cycles map[storj.NodeID]*sync2.Cycle
started sync2.Fence
interval time.Duration
}
var (
errPingSatellite = errs.Class("ping satellite error")
)
const initialBackOff = time.Second
// NewChore creates a new contact chore
func NewChore(log *zap.Logger, interval time.Duration, trust *trust.Pool, dialer rpc.Dialer, service *Service) *Chore {
return &Chore{
log: log,
service: service,
dialer: dialer,
trust: trust,
cycles: make(map[storj.NodeID]*sync2.Cycle),
interval: interval,
}
}
// Run the contact chore on a regular interval with jitter
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
if !chore.service.initialized.Wait(ctx) {
return ctx.Err()
}
// configure the satellite ping cycles
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
// set up a cycle to update ping cycles on a frequent interval
refreshCycle := sync2.NewCycle(time.Minute)
refreshCycle.Start(ctx, &group, func(ctx context.Context) error {
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
return nil
})
defer refreshCycle.Close()
chore.started.Release()
return group.Wait()
}
func (chore *Chore) updateCycles(ctx context.Context, group *errgroup.Group, satellites []storj.NodeID) {
chore.mu.Lock()
defer chore.mu.Unlock()
trustedIDs := make(map[storj.NodeID]struct{})
for _, satellite := range satellites {
satellite := satellite // alias the loop var since it is captured below
trustedIDs[satellite] = struct{}{}
if _, ok := chore.cycles[satellite]; ok {
// Ping cycle has already been started for this satellite
continue
}
// Set up a new ping cycle for the newly trusted satellite
chore.log.Debug("Starting cycle", zap.Stringer("Satellite ID", satellite))
cycle := sync2.NewCycle(chore.interval)
chore.cycles[satellite] = cycle
cycle.Start(ctx, group, func(ctx context.Context) error {
return chore.pingSatellite(ctx, satellite)
})
}
// Stop the ping cycle for satellites that are no longer trusted
for satellite, cycle := range chore.cycles {
if _, ok := trustedIDs[satellite]; !ok {
chore.log.Debug("Stopping cycle", zap.Stringer("Satellite ID", satellite))
cycle.Close()
delete(chore.cycles, satellite)
}
}
}
func (chore *Chore) pingSatellite(ctx context.Context, satellite storj.NodeID) error {
interval := initialBackOff
attempts := 0
for {
mon.Meter("satellite_contact_request").Mark(1) //locked
err := chore.pingSatelliteOnce(ctx, satellite)
attempts++
if err == nil {
return nil
}
chore.log.Error("ping satellite failed ", zap.Stringer("Satellite ID", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
chore.log.Info("context cancelled", zap.Stringer("Satellite ID", satellite))
return nil
}
interval *= 2
if interval >= chore.interval {
chore.log.Info("retries timed out for this cycle", zap.Stringer("Satellite ID", satellite))
return nil
}
}
}
func (chore *Chore) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
Remove Kademlia dependencies from Satellite and Storagenode (#2966) What: cmd/inspector/main.go: removes kad commands internal/testplanet/planet.go: Waits for contact chore to finish satellite/contact/nodesservice.go: creates an empty nodes service implementation satellite/contact/service.go: implements Local and FetchInfo methods & adds external address config value satellite/discovery/service.go: replaces kad.FetchInfo with contact.FetchInfo in Refresh() & removes Discover() satellite/peer.go: sets up contact service and endpoints storagenode/console/service.go: replaces nodeID with contact.Local() storagenode/contact/chore.go: replaces routing table with contact service storagenode/contact/nodesservice.go: creates empty implementation for ping and request info nodes service & implements RequestInfo method storagenode/contact/service.go: creates a service to return the local node and update its own capacity storagenode/monitor/monitor.go: uses contact service in place of routing table storagenode/operator.go: moves operatorconfig from kad into its own setup storagenode/peer.go: sets up contact service, chore, pingstats and endpoints satellite/overlay/config.go: changes NodeSelectionConfig.OnlineWindow default to 4hr to allow for accurate repair selection Removes kademlia setups in: cmd/storagenode/main.go cmd/storj-sim/network.go internal/testplane/planet.go internal/testplanet/satellite.go internal/testplanet/storagenode.go satellite/peer.go scripts/test-sim-backwards.sh scripts/testdata/satellite-config.yaml.lock storagenode/inspector/inspector.go storagenode/peer.go storagenode/storagenodedb/database.go Why: Replacing Kademlia Please describe the tests: • internal/testplanet/planet_test.go: TestBasic: assert that the storagenode can check in with the satellite without any errors TestContact: test that all nodes get inserted into both satellites' overlay cache during testplanet setup • satellite/contact/contact_test.go: TestFetchInfo: Tests that the FetchInfo method returns the correct info • storagenode/contact/contact_test.go: TestNodeInfoUpdated: tests that the contact chore updates the node information TestRequestInfoEndpoint: tests that the Request info endpoint returns the correct info Please describe the performance impact: Node discovery should be at least slightly more performant since each node connects directly to each satellite and no longer needs to wait for bootstrapping. It probably won't be faster in real time on start up since each node waits a random amount of time (less than 1 hr) to initialize its first connection (jitter).
2019-09-19 20:56:34 +01:00
self := chore.service.Local()
address, err := chore.trust.GetAddress(ctx, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
conn, err := chore.dialer.DialAddressID(ctx, address, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
defer func() { err = errs.Combine(err, conn.Close()) }()
_, err = pb.NewDRPCNodeClient(conn.Raw()).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})
if err != nil {
return errPingSatellite.Wrap(err)
}
return nil
}
// Pause stops all the cycles in the contact chore.
func (chore *Chore) Pause(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Pause()
}
}
// TriggerWait ensures that each cycle is done at least once and waits for completion.
// If the cycle is currently running it waits for the previous to complete and then runs.
func (chore *Chore) TriggerWait(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
var group errgroup.Group
for _, cycle := range chore.cycles {
cycle := cycle
group.Go(func() error {
cycle.TriggerWait()
return nil
})
}
_ = group.Wait() // goroutines aren't returning any errors
}
// Close stops all the cycles in the contact chore.
func (chore *Chore) Close() error {
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Close()
}
chore.cycles = nil
return nil
}