diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 8eb9aa30a..2be0dad9c 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -363,6 +363,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { Interval: defaultInterval, Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers DownloadTimeout: 1 * time.Minute, + TotalTimeout: 10 * time.Minute, MaxBufferMem: 4 * memory.MiB, MaxExcessRateOptimalThreshold: 0.05, }, diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 6fef196df..4a8522679 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -129,7 +129,7 @@ func TestDataRepair(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // repaired segment should not contain any piece in the killed and DQ nodes metainfoService := satellite.Metainfo.Service @@ -253,7 +253,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // repair should update audit status as fail node, err = overlay.Get(ctx, corruptedNodeID) @@ -371,7 +371,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // repair should update audit status as fail node, err = overlay.Get(ctx, corruptedNodeID) @@ -463,7 +463,7 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // Verify that the segment was removed count, err = satellite.DB.RepairQueue().Count(ctx) @@ -546,7 +546,7 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // Verify that the segment was removed count, err = satellite.DB.RepairQueue().Count(ctx) @@ -634,7 +634,7 @@ func TestRepairMultipleDisqualified(t *testing.T) { satellite.Repair.Checker.Loop.TriggerWait() satellite.Repair.Repairer.Loop.TriggerWait() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // kill nodes kept alive to ensure repair worked for _, node := range planet.StorageNodes { @@ -729,7 +729,7 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // repair should have been done, due to the override metainfoService := satellite.Metainfo.Service @@ -815,7 +815,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // Increase offline count by the difference to trigger repair toKill += repairThreshold - repairOverride @@ -844,7 +844,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // repair should have been done, due to the override metainfoService := satellite.Metainfo.Service @@ -950,7 +950,7 @@ func TestDataRepairUploadLimit(t *testing.T) { satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.TriggerWait() satellite.Repair.Repairer.Loop.Pause() - satellite.Repair.Repairer.Limiter.Wait() + satellite.Repair.Repairer.WaitForPendingRepairs() // Get the pointer after repair to check the nodes where the pieces are // stored diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index 098e69679..9acdcc1fc 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -10,6 +10,7 @@ import ( "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "storj.io/common/memory" "storj.io/common/pb" @@ -30,6 +31,7 @@ type Config struct { Interval time.Duration `help:"how frequently repairer should try and repair more data" releaseDefault:"5m0s" devDefault:"1m0s"` Timeout time.Duration `help:"time limit for uploading repaired pieces to new storage nodes" default:"5m0s"` DownloadTimeout time.Duration `help:"time limit for downloading pieces from a node for repair" default:"5m0s"` + TotalTimeout time.Duration `help:"time limit for an entire repair job, from queue pop to upload completion" default:"45m"` MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"` MaxExcessRateOptimalThreshold float64 `help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05"` } @@ -38,65 +40,109 @@ type Config struct { // // architecture: Worker type Service struct { - log *zap.Logger - queue queue.RepairQueue - config *Config - Limiter *sync2.Limiter - Loop *sync2.Cycle - repairer *SegmentRepairer + log *zap.Logger + queue queue.RepairQueue + config *Config + JobLimiter *semaphore.Weighted + Loop *sync2.Cycle + repairer *SegmentRepairer } // NewService creates repairing service func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repairer *SegmentRepairer) *Service { return &Service{ - log: log, - queue: queue, - config: config, - Limiter: sync2.NewLimiter(config.MaxRepair), - Loop: sync2.NewCycle(config.Interval), - repairer: repairer, + log: log, + queue: queue, + config: config, + JobLimiter: semaphore.NewWeighted(int64(config.MaxRepair)), + Loop: sync2.NewCycle(config.Interval), + repairer: repairer, } } // Close closes resources func (service *Service) Close() error { return nil } +// WaitForPendingRepairs waits for all ongoing repairs to complete. +// +// NB: this assumes that service.config.MaxRepair will never be changed once this Service instance +// is initialized. If that is not a valid assumption, we should keep a copy of its initial value to +// use here instead. +func (service *Service) WaitForPendingRepairs() { + // Acquire and then release the entire capacity of the semaphore, ensuring that + // it is completely empty (or, at least it was empty at some point). + // + // No error return is possible here; context.Background() can't be canceled + _ = service.JobLimiter.Acquire(context.Background(), int64(service.config.MaxRepair)) + service.JobLimiter.Release(int64(service.config.MaxRepair)) +} + // Run runs the repairer service func (service *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - // wait for all repairs to complete - defer service.Limiter.Wait() + // Wait for all repairs to complete + defer service.WaitForPendingRepairs() - return service.Loop.Run(ctx, func(ctx context.Context) error { + return service.Loop.Run(ctx, service.processWhileQueueHasItems) +} + +// processWhileQueueHasItems keeps calling process() until the queue is empty or something +// else goes wrong in fetching from the queue. +func (service *Service) processWhileQueueHasItems(ctx context.Context) error { + for { err := service.process(ctx) if err != nil { + if storage.ErrEmptyQueue.Has(err) { + return nil + } service.log.Error("process", zap.Error(Error.Wrap(err))) + return err } - return nil - }) + } } // process picks items from repair queue and spawns a repair worker func (service *Service) process(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - for { - seg, err := service.queue.Select(ctx) - if err != nil { - if storage.ErrEmptyQueue.Has(err) { - return nil - } - return err - } - service.log.Info("Retrieved segment from repair queue", zap.Binary("Segment", seg.GetPath())) - service.Limiter.Go(ctx, func() { - err := service.worker(ctx, seg) - if err != nil { - service.log.Error("repair worker failed:", zap.Binary("Segment", seg.GetPath()), zap.Error(err)) - } - }) + // wait until we are allowed to spawn a new job + if err := service.JobLimiter.Acquire(ctx, 1); err != nil { + return err } + + // IMPORTANT: this timeout must be started before service.queue.Select(), in case + // service.queue.Select() takes some non-negligible amount of time, so that we can depend on + // repair jobs being given up within some set interval after the time in the 'attempted' + // column in the queue table. + // + // This is the reason why we are using a semaphore in this somewhat awkward way instead of + // using a simpler sync2.Limiter pattern. We don't want this timeout to include the waiting + // time from the semaphore acquisition, but it _must_ include the queue fetch time. At the + // same time, we don't want to do the queue pop in a separate goroutine, because we want to + // return from service.Run when queue fetch fails. + ctx, cancel := context.WithTimeout(ctx, service.config.TotalTimeout) + + seg, err := service.queue.Select(ctx) + if err != nil { + service.JobLimiter.Release(1) + cancel() + return err + } + service.log.Info("Retrieved segment from repair queue", zap.Binary("Segment", seg.GetPath())) + + // this goroutine inherits the JobLimiter semaphore acquisition and is now responsible + // for releasing it. + go func() { + defer service.JobLimiter.Release(1) + defer cancel() + + if err := service.worker(ctx, seg); err != nil { + service.log.Error("repair worker failed:", zap.Binary("Segment", seg.GetPath()), zap.Error(err)) + } + }() + + return nil } func (service *Service) worker(ctx context.Context, seg *pb.InjuredSegment) (err error) { diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 8e0e52406..af7cdbf67 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -444,6 +444,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # time limit for uploading repaired pieces to new storage nodes # repairer.timeout: 5m0s +# time limit for an entire repair job, from queue pop to upload completion +# repairer.total-timeout: 45m0s + # how often to flush the reported serial rollups to the database # reported-rollup.interval: 5m0s