2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-10-02 20:46:29 +01:00
// See LICENSE for copying information.
package repairer
import (
"context"
2018-10-05 16:58:07 +01:00
"time"
2018-10-02 20:46:29 +01:00
2019-04-22 16:16:21 +01:00
"github.com/zeebo/errs"
2018-10-12 19:04:16 +01:00
"go.uber.org/zap"
2019-04-22 16:16:21 +01:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2018-10-12 19:04:16 +01:00
2019-04-22 16:16:21 +01:00
"storj.io/storj/internal/memory"
2018-10-24 13:35:59 +01:00
"storj.io/storj/internal/sync2"
2019-04-22 16:16:21 +01:00
"storj.io/storj/pkg/identity"
2019-06-04 12:36:27 +01:00
"storj.io/storj/pkg/pb"
2018-12-13 07:12:36 +00:00
"storj.io/storj/pkg/storj"
2019-02-11 11:17:32 +00:00
"storj.io/storj/pkg/transport"
2019-04-25 09:46:32 +01:00
"storj.io/storj/satellite/metainfo"
2019-03-28 20:09:23 +00:00
"storj.io/storj/satellite/orders"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
2018-11-27 15:57:51 +00:00
"storj.io/storj/storage"
2019-07-28 06:55:36 +01:00
"storj.io/storj/uplink/ecclient"
"storj.io/storj/uplink/storage/segments"
2018-10-02 20:46:29 +01:00
)
2019-04-22 16:16:21 +01:00
// Error is a standard error class for this package.
var (
Error = errs . Class ( "repairer error" )
mon = monkit . Package ( )
)
// Config contains configurable values for repairer
type Config struct {
2019-07-11 23:44:47 +01:00
MaxRepair int ` help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1" `
Interval time . Duration ` help:"how frequently repairer should try and repair more data" releaseDefault:"1h" devDefault:"0h5m0s" `
Timeout time . Duration ` help:"time limit for uploading repaired pieces to new storage nodes" devDefault:"10m0s" releaseDefault:"2h" `
MaxBufferMem memory . Size ` help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M" `
MaxExcessRateOptimalThreshold float64 ` help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05" `
2019-04-22 16:16:21 +01:00
}
// GetSegmentRepairer creates a new segment repairer from storeConfig values
2019-07-02 11:08:02 +01:00
func ( c Config ) GetSegmentRepairer ( ctx context . Context , log * zap . Logger , tc transport . Client , metainfo * metainfo . Service , orders * orders . Service , cache * overlay . Cache , identity * identity . FullIdentity ) ( ss SegmentRepairer , err error ) {
2019-04-22 16:16:21 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-02 11:08:02 +01:00
ec := ecclient . NewClient ( log . Named ( "ecclient" ) , tc , c . MaxBufferMem . Int ( ) )
2019-04-22 16:16:21 +01:00
2019-07-11 23:44:47 +01:00
return segments . NewSegmentRepairer (
log . Named ( "repairer" ) , metainfo , orders , cache , ec , identity , c . Timeout , c . MaxExcessRateOptimalThreshold ,
) , nil
2019-04-22 16:16:21 +01:00
}
2018-12-13 07:12:36 +00:00
// SegmentRepairer is a repairer for segments
type SegmentRepairer interface {
2019-05-16 14:49:10 +01:00
Repair ( ctx context . Context , path storj . Path ) ( err error )
2018-10-03 19:35:56 +01:00
}
2019-01-18 13:54:08 +00:00
// Service contains the information needed to run the repair service
type Service struct {
2019-07-02 11:08:02 +01:00
log * zap . Logger
2019-03-28 20:09:23 +00:00
queue queue . RepairQueue
config * Config
2019-04-08 18:33:47 +01:00
Limiter * sync2 . Limiter
Loop sync2 . Cycle
2019-03-28 20:09:23 +00:00
transport transport . Client
2019-04-25 09:46:32 +01:00
metainfo * metainfo . Service
2019-03-28 20:09:23 +00:00
orders * orders . Service
cache * overlay . Cache
repairer SegmentRepairer
2018-10-24 13:35:59 +01:00
}
2019-01-18 13:54:08 +00:00
// NewService creates repairing service
2019-07-02 11:08:02 +01:00
func NewService ( log * zap . Logger , queue queue . RepairQueue , config * Config , interval time . Duration , concurrency int , transport transport . Client , metainfo * metainfo . Service , orders * orders . Service , cache * overlay . Cache ) * Service {
2019-01-18 13:54:08 +00:00
return & Service {
2019-07-02 11:08:02 +01:00
log : log ,
2019-03-28 20:09:23 +00:00
queue : queue ,
config : config ,
2019-04-08 18:33:47 +01:00
Limiter : sync2 . NewLimiter ( concurrency ) ,
Loop : * sync2 . NewCycle ( interval ) ,
2019-03-28 20:09:23 +00:00
transport : transport ,
2019-04-25 09:46:32 +01:00
metainfo : metainfo ,
2019-03-28 20:09:23 +00:00
orders : orders ,
cache : cache ,
2018-10-24 13:35:59 +01:00
}
2018-10-02 20:46:29 +01:00
}
2019-01-18 13:54:08 +00:00
// Close closes resources
func ( service * Service ) Close ( ) error { return nil }
2018-11-01 14:03:45 +00:00
// Run runs the repairer service
2019-01-18 13:54:08 +00:00
func ( service * Service ) Run ( ctx context . Context ) ( err error ) {
2018-10-25 19:59:36 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-10-02 20:46:29 +01:00
2019-02-01 17:28:40 +00:00
// TODO: close segment repairer, currently this leaks connections
2019-03-18 10:55:06 +00:00
service . repairer , err = service . config . GetSegmentRepairer (
ctx ,
2019-07-02 11:08:02 +01:00
service . log ,
2019-03-18 10:55:06 +00:00
service . transport ,
2019-04-25 09:46:32 +01:00
service . metainfo ,
2019-03-28 20:09:23 +00:00
service . orders ,
2019-03-18 10:55:06 +00:00
service . cache ,
service . transport . Identity ( ) ,
)
2019-02-01 17:28:40 +00:00
if err != nil {
return err
}
2018-10-24 13:35:59 +01:00
// wait for all repairs to complete
2019-04-08 18:33:47 +01:00
defer service . Limiter . Wait ( )
2018-10-02 20:46:29 +01:00
2019-04-08 18:33:47 +01:00
return service . Loop . Run ( ctx , func ( ctx context . Context ) error {
2018-12-13 07:12:36 +00:00
err := service . process ( ctx )
2018-11-01 14:03:45 +00:00
if err != nil {
2019-07-23 15:28:06 +01:00
zap . L ( ) . Error ( "process" , zap . Error ( Error . Wrap ( err ) ) )
2018-11-01 14:03:45 +00:00
}
2019-04-08 18:33:47 +01:00
return nil
} )
2018-11-01 14:03:45 +00:00
}
2019-04-22 16:16:21 +01:00
// process picks items from repair queue and spawns a repair worker
2019-06-04 12:36:27 +01:00
func ( service * Service ) process ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-22 16:16:21 +01:00
for {
seg , err := service . queue . Select ( ctx )
2019-07-11 18:26:07 +01:00
zap . L ( ) . Info ( "Dequeued segment from repair queue" , zap . Binary ( "segment" , seg . GetPath ( ) ) )
2018-10-30 20:14:15 +00:00
if err != nil {
2019-04-22 16:16:21 +01:00
if storage . ErrEmptyQueue . Has ( err ) {
return nil
}
return err
2018-10-30 20:14:15 +00:00
}
2019-04-22 16:16:21 +01:00
service . Limiter . Go ( ctx , func ( ) {
2019-06-04 12:36:27 +01:00
err := service . worker ( ctx , seg )
2019-04-22 16:16:21 +01:00
if err != nil {
2019-07-23 15:28:06 +01:00
zap . L ( ) . Error ( "repair worker failed:" , zap . Error ( err ) )
2019-04-22 16:16:21 +01:00
}
} )
}
2018-10-02 20:46:29 +01:00
}
2019-06-04 12:36:27 +01:00
func ( service * Service ) worker ( ctx context . Context , seg * pb . InjuredSegment ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-10 22:27:46 +01:00
workerStartTime := time . Now ( ) . UTC ( )
2019-07-11 18:26:07 +01:00
zap . L ( ) . Info ( "Limiter running repair on segment" , zap . Binary ( "segment" , seg . GetPath ( ) ) )
err = service . repairer . Repair ( ctx , string ( seg . GetPath ( ) ) )
2019-06-04 12:36:27 +01:00
if err != nil {
2019-07-23 15:28:06 +01:00
return Error . New ( "repairing injured segment: %v" , err )
2019-06-04 12:36:27 +01:00
}
2019-07-23 15:28:06 +01:00
2019-07-11 18:26:07 +01:00
zap . L ( ) . Info ( "Deleting segment from repair queue" , zap . Binary ( "segment" , seg . GetPath ( ) ) )
2019-06-04 12:36:27 +01:00
err = service . queue . Delete ( ctx , seg )
if err != nil {
2019-07-23 15:28:06 +01:00
return Error . New ( "deleting repaired segment from the queue: %v" , err )
2019-06-04 12:36:27 +01:00
}
2019-07-10 22:27:46 +01:00
2019-07-23 15:28:06 +01:00
repairedTime := time . Now ( ) . UTC ( )
timeForRepair := repairedTime . Sub ( workerStartTime )
mon . FloatVal ( "time_for_repair" ) . Observe ( timeForRepair . Seconds ( ) )
2019-07-10 22:27:46 +01:00
insertedTime := seg . GetInsertedTime ( )
// do not send metrics if segment was added before the InsertedTime field was added
if ! insertedTime . IsZero ( ) {
timeSinceQueued := workerStartTime . Sub ( insertedTime )
mon . FloatVal ( "time_since_checker_queue" ) . Observe ( timeSinceQueued . Seconds ( ) )
}
2019-06-04 12:36:27 +01:00
return nil
}