storj/storagenode/gracefulexit/chore.go
paul cannon 7395dd1e6e storagenode/gracefulexit: revalidate existing pieces
..before they are transferred to another node and submitted to the
satellite as successful piece transfers, because if we submit an invalid
signature, the node will be marked as a cheater and disqualified
immediately.

These signatures should have been validated when the piece was
originally stored, but bitrot does happen and needn't be cause for an
immediate DQ.

Change-Id: I8b0ebd5812ea8a2e60766005b7251fbb74ef7857
2020-05-28 09:50:14 -05:00

125 lines
3.1 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"sync"
"go.uber.org/zap"
"storj.io/common/rpc"
"storj.io/common/sync2"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/trust"
)
// Chore checks for satellites that the node is exiting and creates a worker per satellite to complete the process.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
store *pieces.Store
satelliteDB satellites.DB
trust *trust.Pool
dialer rpc.Dialer
config Config
exitingMap sync.Map
Loop *sync2.Cycle
limiter *sync2.Limiter
}
// NewChore instantiates Chore.
func NewChore(log *zap.Logger, config Config, store *pieces.Store, trust *trust.Pool, dialer rpc.Dialer, satelliteDB satellites.DB) *Chore {
return &Chore{
log: log,
store: store,
satelliteDB: satelliteDB,
trust: trust,
dialer: dialer,
config: config,
Loop: sync2.NewCycle(config.ChoreInterval),
limiter: sync2.NewLimiter(config.NumWorkers),
}
}
// Run starts the chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
satellites, err := chore.satelliteDB.ListGracefulExits(ctx)
if err != nil {
chore.log.Error("error retrieving satellites.", zap.Error(err))
return nil
}
if len(satellites) == 0 {
return nil
}
chore.log.Debug("exiting", zap.Int("satellites", len(satellites)))
for _, satellite := range satellites {
mon.Meter("satellite_gracefulexit_request").Mark(1) //locked
if satellite.FinishedAt != nil {
continue
}
nodeurl, err := chore.trust.GetNodeURL(ctx, satellite.SatelliteID)
if err != nil {
chore.log.Error("failed to get satellite address.", zap.Error(err))
continue
}
worker := NewWorker(chore.log, chore.store, chore.trust, chore.satelliteDB, chore.dialer, nodeurl, chore.config)
if _, ok := chore.exitingMap.LoadOrStore(nodeurl.ID, worker); ok {
// already running a worker for this satellite
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", nodeurl.ID))
continue
}
chore.limiter.Go(ctx, func() {
err := worker.Run(ctx, func() {
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", nodeurl.ID))
chore.exitingMap.Delete(nodeurl.ID)
})
if err != nil {
chore.log.Error("worker failed", zap.Error(err))
}
if err := worker.Close(); err != nil {
chore.log.Error("closing worker failed", zap.Error(err))
}
})
}
chore.limiter.Wait()
return nil
})
return err
}
// Close closes chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
chore.exitingMap.Range(func(key interface{}, value interface{}) bool {
worker := value.(*Worker)
err := worker.Close()
if err != nil {
worker.log.Error("worker failed on close.", zap.Error(err))
}
chore.exitingMap.Delete(key)
return true
})
return nil
}