c489a70e62
From the name of the function and from the way it is used (only called in one place, from "storj.io/storagenode/gracefulexit".(*Chore).Run()), it should not return graceful exits that have already completed. In particular, this causes a problem in the case that a node has already completed a graceful exit from one satellite, after which the satellite was decommissioned and no longer in the "trusted" list. This causes an error message to show up in the node logs every single minute like "failed to get satellite address ... satellite \"X\" is untrusted". https://forum.storj.io/t/error-gracefulexit-service-failed-to-get-satellite-address/11372 This change causes ListPendingExits to list pending exits only, not all exits. Correspondingly, the check for whether an exit is already completed, in (*Chore).Run(), becomes unnecessary and is here removed. Change-Id: Ia3e9bb3e92be4a32ebcbda0321e3fe61d77deaa8
119 lines
3.0 KiB
Go
119 lines
3.0 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package gracefulexit
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/storagenode/piecetransfer"
|
|
)
|
|
|
|
// Chore checks for satellites that the node is exiting and creates a worker per satellite to complete the process.
|
|
//
|
|
// architecture: Chore
|
|
type Chore struct {
|
|
log *zap.Logger
|
|
dialer rpc.Dialer
|
|
config Config
|
|
|
|
service Service
|
|
transferService piecetransfer.Service
|
|
|
|
exitingMap sync.Map
|
|
Loop *sync2.Cycle
|
|
limiter *sync2.Limiter
|
|
}
|
|
|
|
// NewChore instantiates Chore.
|
|
func NewChore(log *zap.Logger, service Service, transferService piecetransfer.Service, dialer rpc.Dialer, config Config) *Chore {
|
|
return &Chore{
|
|
log: log,
|
|
dialer: dialer,
|
|
service: service,
|
|
transferService: transferService,
|
|
config: config,
|
|
Loop: sync2.NewCycle(config.ChoreInterval),
|
|
limiter: sync2.NewLimiter(config.NumWorkers),
|
|
}
|
|
}
|
|
|
|
// Run starts the chore.
|
|
func (chore *Chore) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
geSatellites, err := chore.service.ListPendingExits(ctx)
|
|
if err != nil {
|
|
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
if len(geSatellites) == 0 {
|
|
return nil
|
|
}
|
|
chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites)))
|
|
|
|
for _, satellite := range geSatellites {
|
|
mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked
|
|
satellite := satellite
|
|
|
|
worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config)
|
|
if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok {
|
|
// already running a worker for this satellite
|
|
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
|
continue
|
|
}
|
|
|
|
chore.limiter.Go(ctx, func() {
|
|
err := worker.Run(ctx, func() {
|
|
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
|
chore.exitingMap.Delete(satellite.SatelliteID)
|
|
})
|
|
|
|
if err != nil {
|
|
chore.log.Error("worker failed", zap.Error(err))
|
|
}
|
|
|
|
if err := worker.Close(); err != nil {
|
|
chore.log.Error("closing worker failed", zap.Error(err))
|
|
}
|
|
})
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
chore.limiter.Wait()
|
|
|
|
return err
|
|
}
|
|
|
|
// TestWaitForWorkers waits for any pending worker to finish.
|
|
func (chore *Chore) TestWaitForWorkers() {
|
|
chore.limiter.Wait()
|
|
}
|
|
|
|
// Close closes chore.
|
|
func (chore *Chore) Close() error {
|
|
chore.Loop.Close()
|
|
chore.exitingMap.Range(func(key interface{}, value interface{}) bool {
|
|
worker := value.(*Worker)
|
|
err := worker.Close()
|
|
if err != nil {
|
|
worker.log.Error("worker failed on close.", zap.Error(err))
|
|
}
|
|
chore.exitingMap.Delete(key)
|
|
return true
|
|
})
|
|
|
|
return nil
|
|
}
|