storagenode/contact: exponential backoff retries for pinging Satellites (#3372)

This commit is contained in:
Jennifer Li Johnson 2019-11-04 16:03:21 -05:00 committed by GitHub
parent f6a4155c46
commit aa7d15a365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 52 deletions

View File

@ -216,7 +216,7 @@ func (planet *Planet) Start(ctx context.Context) {
peer := peer
group.Go(func() error {
peer.Storage2.Monitor.Loop.TriggerWait()
peer.Contact.Chore.Loop.TriggerWait()
peer.Contact.Chore.TriggerWait(ctx)
return nil
})
}

View File

@ -5,6 +5,7 @@ package contact
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
@ -14,6 +15,7 @@ import (
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/trust"
)
@ -27,9 +29,18 @@ type Chore struct {
trust *trust.Pool
Loop *sync2.Cycle
mu sync.Mutex
cycles []*sync2.Cycle
started sync2.Fence
interval time.Duration
}
var (
errPingSatellite = errs.Class("ping satellite error")
)
const initialBackOff = time.Second
// NewChore creates a new contact chore
func NewChore(log *zap.Logger, interval time.Duration, trust *trust.Pool, dialer rpc.Dialer, service *Service) *Chore {
return &Chore{
@ -39,7 +50,7 @@ func NewChore(log *zap.Logger, interval time.Duration, trust *trust.Pool, dialer
trust: trust,
Loop: sync2.NewCycle(interval),
interval: interval,
}
}
@ -48,60 +59,106 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Info("Storagenode contact chore starting up")
var group errgroup.Group
if !chore.service.initialized.Wait(ctx) {
return ctx.Err()
}
return chore.Loop.Run(ctx, func(ctx context.Context) error {
if err := chore.pingSatellites(ctx); err != nil {
chore.log.Error("pingSatellites failed", zap.Error(err))
}
return nil
})
}
func (chore *Chore) pingSatellites(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
self := chore.service.Local()
satellites := chore.trust.GetSatellites(ctx)
for _, satellite := range satellites {
chore.mu.Lock()
for _, satellite := range chore.trust.GetSatellites(ctx) {
satellite := satellite
addr, err := chore.trust.GetAddress(ctx, satellite)
if err != nil {
chore.log.Error("getting satellite address", zap.Error(err))
continue
}
group.Go(func() error {
conn, err := chore.dialer.DialAddressID(ctx, addr, satellite)
if err != nil {
return err
cycle := sync2.NewCycle(chore.interval)
chore.cycles = append(chore.cycles, cycle)
cycle.Start(ctx, &group, func(ctx context.Context) error {
chore.log.Debug("starting cycle", zap.Stringer("satellite", satellite))
interval := initialBackOff
attempts := 0
for {
err := chore.pingSatellite(ctx, satellite)
attempts++
if err == nil {
return nil
}
chore.log.Error("ping satellite failed ", zap.Stringer("satellite", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
chore.log.Info("context cancelled", zap.Stringer("satellite", satellite))
return nil
}
interval *= 2
if interval >= chore.interval {
chore.log.Info("retries timed out for this cycle", zap.Stringer("satellite", satellite))
return nil
}
}
defer func() { err = errs.Combine(err, conn.Close()) }()
resp, err := conn.NodeClient().CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})
if err != nil {
chore.log.Error("Check-In with satellite failed", zap.Error(err))
} else if !resp.PingNodeSuccess {
chore.log.Error("Check-In with satellite failed due to failed ping back", zap.String("satellite ID", satellite.String()), zap.String("satellite addr", addr), zap.String("ping back error message", resp.GetPingErrorMessage()))
}
return err
})
}
chore.mu.Unlock()
chore.started.Release()
return group.Wait()
}
// Close stops the contact chore
func (chore *Chore) Close() error {
chore.Loop.Close()
func (chore *Chore) pingSatellite(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
self := chore.service.Local()
address, err := chore.trust.GetAddress(ctx, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
conn, err := chore.dialer.DialAddressID(ctx, address, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
_, err = conn.NodeClient().CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})
if err != nil {
return errPingSatellite.Wrap(err)
}
return nil
}
// Pause stops all the cycles in the contact chore.
func (chore *Chore) Pause(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Pause()
}
}
// TriggerWait ensures that each cycle is done at least once and waits for completion.
// If the cycle is currently running it waits for the previous to complete and then runs.
func (chore *Chore) TriggerWait(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
var group errgroup.Group
for _, cycle := range chore.cycles {
cycle := cycle
group.Go(func() error {
cycle.TriggerWait()
return nil
})
}
_ = group.Wait() // goroutines aren't returning any errors
}
// Close stops all the cycles in the contact chore.
func (chore *Chore) Close() error {
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Close()
}
chore.cycles = nil
return nil
}

View File

@ -51,11 +51,9 @@ func TestNodeInfoUpdated(t *testing.T) {
satellite := planet.Satellites[0]
node := planet.StorageNodes[0]
node.Contact.Chore.Loop.Pause()
node.Contact.Chore.Pause(ctx)
oldInfo, err := satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
oldCapacity := oldInfo.Capacity
newCapacity := pb.NodeCapacity{
@ -63,10 +61,9 @@ func TestNodeInfoUpdated(t *testing.T) {
FreeDisk: 0,
}
require.NotEqual(t, oldCapacity, newCapacity)
node.Contact.Service.UpdateSelf(&newCapacity)
node.Contact.Chore.Loop.TriggerWait()
node.Contact.Chore.TriggerWait(ctx)
newInfo, err := satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)