2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-10-02 20:46:29 +01:00
// See LICENSE for copying information.
package repairer
import (
"context"
2018-10-05 16:58:07 +01:00
"time"
2018-10-02 20:46:29 +01:00
2019-04-22 16:16:21 +01:00
"github.com/zeebo/errs"
2018-10-12 19:04:16 +01:00
"go.uber.org/zap"
2019-07-30 16:38:25 +01:00
"gopkg.in/spacemonkeygo/monkit.v2"
2018-10-12 19:04:16 +01:00
2019-06-04 12:36:27 +01:00
"storj.io/storj/pkg/pb"
2019-11-14 19:46:15 +00:00
"storj.io/storj/private/memory"
"storj.io/storj/private/sync2"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/repair/queue"
2018-11-27 15:57:51 +00:00
"storj.io/storj/storage"
2018-10-02 20:46:29 +01:00
)
2019-04-22 16:16:21 +01:00
// Error is a standard error class for this package.
var (
Error = errs . Class ( "repairer error" )
mon = monkit . Package ( )
)
// Config contains configurable values for repairer
type Config struct {
2019-07-11 23:44:47 +01:00
MaxRepair int ` help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1" `
2019-12-09 15:56:52 +00:00
Interval time . Duration ` help:"how frequently repairer should try and repair more data" releaseDefault:"5m0s" devDefault:"1m0s" `
2019-11-18 17:52:56 +00:00
Timeout time . Duration ` help:"time limit for uploading repaired pieces to new storage nodes" default:"5m0s" `
2019-10-30 20:31:08 +00:00
DownloadTimeout time . Duration ` help:"time limit for downloading pieces from a node for repair" default:"5m0s" `
2019-07-11 23:44:47 +01:00
MaxBufferMem memory . Size ` help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M" `
MaxExcessRateOptimalThreshold float64 ` help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05" `
2019-04-22 16:16:21 +01:00
}
2019-01-18 13:54:08 +00:00
// Service contains the information needed to run the repair service
2019-09-10 14:24:16 +01:00
//
// architecture: Worker
2019-01-18 13:54:08 +00:00
type Service struct {
2019-07-29 12:24:56 +01:00
log * zap . Logger
queue queue . RepairQueue
config * Config
Limiter * sync2 . Limiter
Loop sync2 . Cycle
repairer * SegmentRepairer
2018-10-24 13:35:59 +01:00
}
2019-01-18 13:54:08 +00:00
// NewService creates repairing service
2019-09-06 20:20:36 +01:00
func NewService ( log * zap . Logger , queue queue . RepairQueue , config * Config , repairer * SegmentRepairer ) * Service {
2019-01-18 13:54:08 +00:00
return & Service {
2019-07-29 12:24:56 +01:00
log : log ,
queue : queue ,
config : config ,
2019-09-06 20:20:36 +01:00
Limiter : sync2 . NewLimiter ( config . MaxRepair ) ,
Loop : * sync2 . NewCycle ( config . Interval ) ,
2019-07-29 12:24:56 +01:00
repairer : repairer ,
2018-10-24 13:35:59 +01:00
}
2018-10-02 20:46:29 +01:00
}
2019-01-18 13:54:08 +00:00
// Close closes resources
func ( service * Service ) Close ( ) error { return nil }
2018-11-01 14:03:45 +00:00
// Run runs the repairer service
2019-01-18 13:54:08 +00:00
func ( service * Service ) Run ( ctx context . Context ) ( err error ) {
2018-10-25 19:59:36 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-10-02 20:46:29 +01:00
2018-10-24 13:35:59 +01:00
// wait for all repairs to complete
2019-04-08 18:33:47 +01:00
defer service . Limiter . Wait ( )
2018-10-02 20:46:29 +01:00
2019-04-08 18:33:47 +01:00
return service . Loop . Run ( ctx , func ( ctx context . Context ) error {
2018-12-13 07:12:36 +00:00
err := service . process ( ctx )
2018-11-01 14:03:45 +00:00
if err != nil {
2019-07-30 16:38:25 +01:00
service . log . Error ( "process" , zap . Error ( Error . Wrap ( err ) ) )
2018-11-01 14:03:45 +00:00
}
2019-04-08 18:33:47 +01:00
return nil
} )
2018-11-01 14:03:45 +00:00
}
2019-04-22 16:16:21 +01:00
// process picks items from repair queue and spawns a repair worker
2019-06-04 12:36:27 +01:00
func ( service * Service ) process ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-22 16:16:21 +01:00
for {
seg , err := service . queue . Select ( ctx )
2018-10-30 20:14:15 +00:00
if err != nil {
2019-04-22 16:16:21 +01:00
if storage . ErrEmptyQueue . Has ( err ) {
return nil
}
return err
2018-10-30 20:14:15 +00:00
}
2019-10-16 16:28:56 +01:00
service . log . Info ( "Retrieved segment from repair queue" , zap . Binary ( "Segment" , seg . GetPath ( ) ) )
2018-10-30 20:14:15 +00:00
2019-04-22 16:16:21 +01:00
service . Limiter . Go ( ctx , func ( ) {
2019-06-04 12:36:27 +01:00
err := service . worker ( ctx , seg )
2019-04-22 16:16:21 +01:00
if err != nil {
2019-10-16 16:28:56 +01:00
service . log . Error ( "repair worker failed:" , zap . Binary ( "Segment" , seg . GetPath ( ) ) , zap . Error ( err ) )
2019-04-22 16:16:21 +01:00
}
} )
}
2018-10-02 20:46:29 +01:00
}
2019-06-04 12:36:27 +01:00
func ( service * Service ) worker ( ctx context . Context , seg * pb . InjuredSegment ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-10 22:27:46 +01:00
workerStartTime := time . Now ( ) . UTC ( )
2019-10-18 12:43:24 +01:00
service . log . Info ( "Limiter running repair on segment" ,
zap . Binary ( "Segment" , seg . GetPath ( ) ) ,
zap . String ( "Segment Path" , string ( seg . GetPath ( ) ) ) )
2019-08-05 16:09:16 +01:00
// note that shouldDelete is used even in the case where err is not null
shouldDelete , err := service . repairer . Repair ( ctx , string ( seg . GetPath ( ) ) )
if shouldDelete {
2019-07-30 16:38:25 +01:00
if IrreparableError . Has ( err ) {
2019-08-05 16:09:16 +01:00
service . log . Error ( "deleting irreparable segment from the queue:" ,
2019-07-30 16:38:25 +01:00
zap . Error ( service . queue . Delete ( ctx , seg ) ) ,
2019-10-16 16:28:56 +01:00
zap . Binary ( "Segment" , seg . GetPath ( ) ) ,
2019-07-30 16:38:25 +01:00
)
2019-08-05 16:09:16 +01:00
} else {
2019-10-16 16:28:56 +01:00
service . log . Info ( "deleting segment from repair queue" , zap . Binary ( "Segment" , seg . GetPath ( ) ) )
2019-08-05 16:09:16 +01:00
}
delErr := service . queue . Delete ( ctx , seg )
if delErr != nil {
err = errs . Combine ( err , Error . New ( "deleting repaired segment from the queue: %v" , delErr ) )
2019-07-30 16:38:25 +01:00
}
2019-06-04 12:36:27 +01:00
}
if err != nil {
2019-08-05 16:09:16 +01:00
return Error . New ( "repairing injured segment: %v" , err )
2019-06-04 12:36:27 +01:00
}
2019-07-10 22:27:46 +01:00
2019-07-23 15:28:06 +01:00
repairedTime := time . Now ( ) . UTC ( )
timeForRepair := repairedTime . Sub ( workerStartTime )
mon . FloatVal ( "time_for_repair" ) . Observe ( timeForRepair . Seconds ( ) )
2019-07-10 22:27:46 +01:00
insertedTime := seg . GetInsertedTime ( )
// do not send metrics if segment was added before the InsertedTime field was added
if ! insertedTime . IsZero ( ) {
timeSinceQueued := workerStartTime . Sub ( insertedTime )
mon . FloatVal ( "time_since_checker_queue" ) . Observe ( timeSinceQueued . Seconds ( ) )
}
2019-06-04 12:36:27 +01:00
return nil
}