afe05edff2
Fixes a data race caused by not waiting for workers to finish before shutting down. Currently this ended up failing logging because it was closed when test tried to write to it. Change-Id: I074045cd83bbf49e658f51353aa7901e9a5d074b
126 lines
3.1 KiB
Go
126 lines
3.1 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package gracefulexit
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/storj/pkg/rpc"
|
|
"storj.io/storj/private/sync2"
|
|
"storj.io/storj/storagenode/pieces"
|
|
"storj.io/storj/storagenode/satellites"
|
|
"storj.io/storj/storagenode/trust"
|
|
)
|
|
|
|
// Chore checks for satellites that the node is exiting and creates a worker per satellite to complete the process.
|
|
//
|
|
// architecture: Chore
|
|
type Chore struct {
|
|
log *zap.Logger
|
|
store *pieces.Store
|
|
satelliteDB satellites.DB
|
|
trust *trust.Pool
|
|
dialer rpc.Dialer
|
|
|
|
config Config
|
|
|
|
exitingMap sync.Map
|
|
Loop sync2.Cycle
|
|
limiter sync2.Limiter
|
|
}
|
|
|
|
// NewChore instantiates Chore.
|
|
func NewChore(log *zap.Logger, config Config, store *pieces.Store, trust *trust.Pool, dialer rpc.Dialer, satelliteDB satellites.DB) *Chore {
|
|
return &Chore{
|
|
log: log,
|
|
store: store,
|
|
satelliteDB: satelliteDB,
|
|
trust: trust,
|
|
dialer: dialer,
|
|
config: config,
|
|
Loop: *sync2.NewCycle(config.ChoreInterval),
|
|
limiter: *sync2.NewLimiter(config.NumWorkers),
|
|
}
|
|
}
|
|
|
|
// Run starts the chore.
|
|
func (chore *Chore) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
chore.log.Debug("checking pending exits")
|
|
|
|
satellites, err := chore.satelliteDB.ListGracefulExits(ctx)
|
|
if err != nil {
|
|
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
if len(satellites) == 0 {
|
|
chore.log.Debug("no satellites found")
|
|
return nil
|
|
}
|
|
|
|
for _, satellite := range satellites {
|
|
mon.Meter("satellite_gracefulexit_request").Mark(1) //locked
|
|
if satellite.FinishedAt != nil {
|
|
continue
|
|
}
|
|
satelliteID := satellite.SatelliteID
|
|
addr, err := chore.trust.GetAddress(ctx, satelliteID)
|
|
if err != nil {
|
|
chore.log.Error("failed to get satellite address.", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, satelliteID, addr, chore.config)
|
|
if _, ok := chore.exitingMap.LoadOrStore(satelliteID, worker); ok {
|
|
// already running a worker for this satellite
|
|
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satelliteID))
|
|
continue
|
|
}
|
|
|
|
chore.limiter.Go(ctx, func() {
|
|
err := worker.Run(ctx, func() {
|
|
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satelliteID))
|
|
chore.exitingMap.Delete(satelliteID)
|
|
})
|
|
if err != nil {
|
|
chore.log.Error("worker failed", zap.Error(err))
|
|
}
|
|
|
|
if err := worker.Close(); err != nil {
|
|
chore.log.Error("closing worker failed", zap.Error(err))
|
|
}
|
|
})
|
|
}
|
|
chore.limiter.Wait()
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Close closes chore.
|
|
func (chore *Chore) Close() error {
|
|
chore.Loop.Close()
|
|
chore.exitingMap.Range(func(key interface{}, value interface{}) bool {
|
|
worker := value.(*Worker)
|
|
err := worker.Close()
|
|
if err != nil {
|
|
worker.log.Error("worker failed on close.", zap.Error(err))
|
|
}
|
|
chore.exitingMap.Delete(key)
|
|
return true
|
|
})
|
|
|
|
return nil
|
|
}
|