2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-10-02 20:46:29 +01:00
// See LICENSE for copying information.
package repairer
import (
"context"
2018-10-05 16:58:07 +01:00
"time"
2018-10-02 20:46:29 +01:00
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-04-22 16:16:21 +01:00
"github.com/zeebo/errs"
2018-10-12 19:04:16 +01:00
"go.uber.org/zap"
2020-01-08 18:33:15 +00:00
"golang.org/x/sync/semaphore"
2018-10-12 19:04:16 +01:00
2019-12-27 11:48:47 +00:00
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/sync2"
2020-02-24 20:13:12 +00:00
"storj.io/storj/satellite/repair/irreparable"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/repair/queue"
2018-11-27 15:57:51 +00:00
"storj.io/storj/storage"
2018-10-02 20:46:29 +01:00
)
2019-04-22 16:16:21 +01:00
// Error is a standard error class for this package.
var (
Error = errs . Class ( "repairer error" )
mon = monkit . Package ( )
)
// Config contains configurable values for repairer
type Config struct {
2019-07-11 23:44:47 +01:00
MaxRepair int ` help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1" `
2019-12-09 15:56:52 +00:00
Interval time . Duration ` help:"how frequently repairer should try and repair more data" releaseDefault:"5m0s" devDefault:"1m0s" `
2019-11-18 17:52:56 +00:00
Timeout time . Duration ` help:"time limit for uploading repaired pieces to new storage nodes" default:"5m0s" `
2019-10-30 20:31:08 +00:00
DownloadTimeout time . Duration ` help:"time limit for downloading pieces from a node for repair" default:"5m0s" `
2020-01-08 18:33:15 +00:00
TotalTimeout time . Duration ` help:"time limit for an entire repair job, from queue pop to upload completion" default:"45m" `
2019-07-11 23:44:47 +01:00
MaxBufferMem memory . Size ` help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M" `
MaxExcessRateOptimalThreshold float64 ` help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05" `
2020-03-18 23:55:09 +00:00
InMemoryRepair bool ` help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false" `
2019-04-22 16:16:21 +01:00
}
2019-01-18 13:54:08 +00:00
// Service contains the information needed to run the repair service
2019-09-10 14:24:16 +01:00
//
// architecture: Worker
2019-01-18 13:54:08 +00:00
type Service struct {
2020-01-08 18:33:15 +00:00
log * zap . Logger
queue queue . RepairQueue
config * Config
JobLimiter * semaphore . Weighted
Loop * sync2 . Cycle
repairer * SegmentRepairer
2020-02-24 20:13:12 +00:00
irrDB irreparable . DB
2018-10-24 13:35:59 +01:00
}
2019-01-18 13:54:08 +00:00
// NewService creates repairing service
2020-02-24 20:13:12 +00:00
func NewService ( log * zap . Logger , queue queue . RepairQueue , config * Config , repairer * SegmentRepairer , irrDB irreparable . DB ) * Service {
2019-01-18 13:54:08 +00:00
return & Service {
2020-01-08 18:33:15 +00:00
log : log ,
queue : queue ,
config : config ,
JobLimiter : semaphore . NewWeighted ( int64 ( config . MaxRepair ) ) ,
Loop : sync2 . NewCycle ( config . Interval ) ,
repairer : repairer ,
2020-02-24 20:13:12 +00:00
irrDB : irrDB ,
2018-10-24 13:35:59 +01:00
}
2018-10-02 20:46:29 +01:00
}
2019-01-18 13:54:08 +00:00
// Close closes resources
func ( service * Service ) Close ( ) error { return nil }
2020-01-08 18:33:15 +00:00
// WaitForPendingRepairs waits for all ongoing repairs to complete.
//
// NB: this assumes that service.config.MaxRepair will never be changed once this Service instance
// is initialized. If that is not a valid assumption, we should keep a copy of its initial value to
// use here instead.
func ( service * Service ) WaitForPendingRepairs ( ) {
// Acquire and then release the entire capacity of the semaphore, ensuring that
// it is completely empty (or, at least it was empty at some point).
//
// No error return is possible here; context.Background() can't be canceled
_ = service . JobLimiter . Acquire ( context . Background ( ) , int64 ( service . config . MaxRepair ) )
service . JobLimiter . Release ( int64 ( service . config . MaxRepair ) )
}
2018-11-01 14:03:45 +00:00
// Run runs the repairer service
2019-01-18 13:54:08 +00:00
func ( service * Service ) Run ( ctx context . Context ) ( err error ) {
2018-10-25 19:59:36 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-10-02 20:46:29 +01:00
2020-01-08 18:33:15 +00:00
// Wait for all repairs to complete
defer service . WaitForPendingRepairs ( )
2018-10-02 20:46:29 +01:00
2020-01-08 18:33:15 +00:00
return service . Loop . Run ( ctx , service . processWhileQueueHasItems )
2018-11-01 14:03:45 +00:00
}
2020-01-08 18:33:15 +00:00
// processWhileQueueHasItems keeps calling process() until the queue is empty or something
// else goes wrong in fetching from the queue.
func ( service * Service ) processWhileQueueHasItems ( ctx context . Context ) error {
2019-04-22 16:16:21 +01:00
for {
2020-01-08 18:33:15 +00:00
err := service . process ( ctx )
2018-10-30 20:14:15 +00:00
if err != nil {
2019-04-22 16:16:21 +01:00
if storage . ErrEmptyQueue . Has ( err ) {
return nil
}
2020-01-08 18:33:15 +00:00
service . log . Error ( "process" , zap . Error ( Error . Wrap ( err ) ) )
2019-04-22 16:16:21 +01:00
return err
2018-10-30 20:14:15 +00:00
}
2020-01-08 18:33:15 +00:00
}
}
2018-10-30 20:14:15 +00:00
2020-01-08 18:33:15 +00:00
// process picks items from repair queue and spawns a repair worker
func ( service * Service ) process ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
// wait until we are allowed to spawn a new job
if err := service . JobLimiter . Acquire ( ctx , 1 ) ; err != nil {
return err
2019-04-22 16:16:21 +01:00
}
2020-01-08 18:33:15 +00:00
// IMPORTANT: this timeout must be started before service.queue.Select(), in case
// service.queue.Select() takes some non-negligible amount of time, so that we can depend on
// repair jobs being given up within some set interval after the time in the 'attempted'
// column in the queue table.
//
// This is the reason why we are using a semaphore in this somewhat awkward way instead of
// using a simpler sync2.Limiter pattern. We don't want this timeout to include the waiting
// time from the semaphore acquisition, but it _must_ include the queue fetch time. At the
// same time, we don't want to do the queue pop in a separate goroutine, because we want to
// return from service.Run when queue fetch fails.
ctx , cancel := context . WithTimeout ( ctx , service . config . TotalTimeout )
seg , err := service . queue . Select ( ctx )
if err != nil {
service . JobLimiter . Release ( 1 )
cancel ( )
return err
}
service . log . Info ( "Retrieved segment from repair queue" , zap . Binary ( "Segment" , seg . GetPath ( ) ) )
// this goroutine inherits the JobLimiter semaphore acquisition and is now responsible
// for releasing it.
go func ( ) {
defer service . JobLimiter . Release ( 1 )
defer cancel ( )
if err := service . worker ( ctx , seg ) ; err != nil {
service . log . Error ( "repair worker failed:" , zap . Binary ( "Segment" , seg . GetPath ( ) ) , zap . Error ( err ) )
}
} ( )
return nil
2018-10-02 20:46:29 +01:00
}
2019-06-04 12:36:27 +01:00
func ( service * Service ) worker ( ctx context . Context , seg * pb . InjuredSegment ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-10 22:27:46 +01:00
workerStartTime := time . Now ( ) . UTC ( )
2019-10-18 12:43:24 +01:00
service . log . Info ( "Limiter running repair on segment" ,
zap . Binary ( "Segment" , seg . GetPath ( ) ) ,
zap . String ( "Segment Path" , string ( seg . GetPath ( ) ) ) )
2019-08-05 16:09:16 +01:00
// note that shouldDelete is used even in the case where err is not null
shouldDelete , err := service . repairer . Repair ( ctx , string ( seg . GetPath ( ) ) )
if shouldDelete {
2020-02-24 20:13:12 +00:00
if irreparableErr , ok := err . ( * irreparableError ) ; ok {
service . log . Error ( "segment could not be repaired! adding to irreparableDB for more attention" ,
zap . Error ( err ) ,
zap . Binary ( "segment" , seg . GetPath ( ) ) )
segmentInfo := & pb . IrreparableSegment {
Path : seg . GetPath ( ) ,
SegmentDetail : irreparableErr . segmentInfo ,
LostPieces : irreparableErr . piecesRequired - irreparableErr . piecesAvailable ,
LastRepairAttempt : time . Now ( ) . Unix ( ) ,
RepairAttemptCount : int64 ( 1 ) ,
}
if err := service . irrDB . IncrementRepairAttempts ( ctx , segmentInfo ) ; err != nil {
service . log . Error ( "failed to add segment to irreparableDB! will leave in repair queue" , zap . Error ( err ) )
shouldDelete = false
}
} else if err != nil {
service . log . Error ( "unexpected error repairing segment!" ,
zap . Error ( err ) ,
zap . Binary ( "segment" , seg . GetPath ( ) ) )
2019-08-05 16:09:16 +01:00
} else {
2020-02-24 20:13:12 +00:00
service . log . Info ( "removing repaired segment from repair queue" ,
zap . Binary ( "Segment" , seg . GetPath ( ) ) )
2019-08-05 16:09:16 +01:00
}
2020-02-24 20:13:12 +00:00
if shouldDelete {
delErr := service . queue . Delete ( ctx , seg )
if delErr != nil {
err = errs . Combine ( err , Error . New ( "failed to remove segment from queue: %v" , delErr ) )
}
2019-07-30 16:38:25 +01:00
}
2019-06-04 12:36:27 +01:00
}
if err != nil {
2020-02-24 20:13:12 +00:00
return Error . Wrap ( err )
2019-06-04 12:36:27 +01:00
}
2019-07-10 22:27:46 +01:00
2019-07-23 15:28:06 +01:00
repairedTime := time . Now ( ) . UTC ( )
timeForRepair := repairedTime . Sub ( workerStartTime )
2020-01-29 22:44:19 +00:00
mon . FloatVal ( "time_for_repair" ) . Observe ( timeForRepair . Seconds ( ) ) //locked
2019-07-23 15:28:06 +01:00
2019-07-10 22:27:46 +01:00
insertedTime := seg . GetInsertedTime ( )
// do not send metrics if segment was added before the InsertedTime field was added
if ! insertedTime . IsZero ( ) {
timeSinceQueued := workerStartTime . Sub ( insertedTime )
2020-01-29 22:44:19 +00:00
mon . FloatVal ( "time_since_checker_queue" ) . Observe ( timeSinceQueued . Seconds ( ) ) //locked
2019-07-10 22:27:46 +01:00
}
2019-06-04 12:36:27 +01:00
return nil
}