2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-12-13 07:12:36 +00:00
// See LICENSE for copying information.
2019-07-29 12:24:56 +01:00
package repairer
2018-12-13 07:12:36 +00:00
import (
"context"
2019-07-11 23:44:47 +01:00
"math"
2019-03-19 13:14:59 +00:00
"time"
2018-12-13 07:12:36 +00:00
2019-01-29 20:42:27 +00:00
"github.com/zeebo/errs"
2019-07-02 11:08:02 +01:00
"go.uber.org/zap"
2019-01-29 20:42:27 +00:00
2018-12-13 07:12:36 +00:00
"storj.io/storj/pkg/pb"
2019-09-19 05:46:39 +01:00
"storj.io/storj/pkg/rpc"
2019-09-06 20:20:36 +01:00
"storj.io/storj/pkg/signing"
2018-12-13 07:12:36 +00:00
"storj.io/storj/pkg/storj"
2019-04-25 09:46:32 +01:00
"storj.io/storj/satellite/metainfo"
2019-03-28 20:09:23 +00:00
"storj.io/storj/satellite/orders"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2019-10-15 04:39:28 +01:00
"storj.io/storj/storage"
2019-07-28 06:55:36 +01:00
"storj.io/storj/uplink/eestream"
2018-12-13 07:12:36 +00:00
)
2019-07-30 16:38:25 +01:00
// IrreparableError is the errs class of irreparable segment errors
var IrreparableError = errs . Class ( "irreparable error" )
2019-07-29 12:24:56 +01:00
// SegmentRepairer for segments
type SegmentRepairer struct {
2019-07-02 11:08:02 +01:00
log * zap . Logger
2019-04-25 09:46:32 +01:00
metainfo * metainfo . Service
orders * orders . Service
2019-08-06 17:35:59 +01:00
overlay * overlay . Service
2019-09-06 20:20:36 +01:00
ec * ECRepairer
2019-04-25 09:46:32 +01:00
timeout time . Duration
2019-07-11 23:44:47 +01:00
// multiplierOptimalThreshold is the value that multiplied by the optimal
// threshold results in the maximum limit of number of nodes to upload
// repaired pieces
multiplierOptimalThreshold float64
2019-10-02 13:58:37 +01:00
//repairOverride is the value handed over from the checker to override the Repair Threshold
repairOverride int
2018-12-13 07:12:36 +00:00
}
2019-07-11 23:44:47 +01:00
// NewSegmentRepairer creates a new instance of SegmentRepairer.
//
// excessPercentageOptimalThreshold is the percentage to apply over the optimal
// threshould to determine the maximum limit of nodes to upload repaired pieces,
// when negative, 0 is applied.
func NewSegmentRepairer (
log * zap . Logger , metainfo * metainfo . Service , orders * orders . Service ,
2019-09-19 05:46:39 +01:00
overlay * overlay . Service , dialer rpc . Dialer , timeout time . Duration ,
2019-10-02 13:58:37 +01:00
excessOptimalThreshold float64 , repairOverride int , satelliteSignee signing . Signee ,
2019-07-29 12:24:56 +01:00
) * SegmentRepairer {
2019-07-11 23:44:47 +01:00
if excessOptimalThreshold < 0 {
excessOptimalThreshold = 0
}
2019-07-29 12:24:56 +01:00
return & SegmentRepairer {
2019-07-11 23:44:47 +01:00
log : log ,
metainfo : metainfo ,
orders : orders ,
2019-08-06 17:35:59 +01:00
overlay : overlay ,
2019-09-19 05:46:39 +01:00
ec : NewECRepairer ( log . Named ( "ec repairer" ) , dialer , satelliteSignee ) ,
2019-07-11 23:44:47 +01:00
timeout : timeout ,
multiplierOptimalThreshold : 1 + excessOptimalThreshold ,
2019-10-02 13:58:37 +01:00
repairOverride : repairOverride ,
2019-03-18 10:55:06 +00:00
}
2018-12-13 07:12:36 +00:00
}
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
2019-08-05 16:09:16 +01:00
// note that shouldDelete is used even in the case where err is not null
2019-09-13 17:21:20 +01:00
// note that it will update audit status as failed for nodes that failed piece hash verification during repair downloading
2019-08-05 16:09:16 +01:00
func ( repairer * SegmentRepairer ) Repair ( ctx context . Context , path storj . Path ) ( shouldDelete bool , err error ) {
2019-07-23 15:28:06 +01:00
defer mon . Task ( ) ( & ctx , path ) ( & err )
2018-12-13 07:12:36 +00:00
2019-04-25 09:46:32 +01:00
// Read the segment pointer from the metainfo
2019-06-05 15:23:10 +01:00
pointer , err := repairer . metainfo . Get ( ctx , path )
2018-12-13 07:12:36 +00:00
if err != nil {
2019-10-15 04:39:28 +01:00
if storage . ErrKeyNotFound . Has ( err ) {
mon . Meter ( "repair_unnecessary" ) . Mark ( 1 )
2019-10-16 16:28:56 +01:00
repairer . log . Debug ( "segment was deleted" , zap . Binary ( "Segment" , [ ] byte ( path ) ) )
2019-10-15 04:39:28 +01:00
return true , nil
}
return false , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-03-18 10:55:06 +00:00
if pointer . GetType ( ) != pb . Pointer_REMOTE {
2019-10-16 16:28:56 +01:00
return true , Error . New ( "cannot repair inline segment" )
2018-12-13 07:12:36 +00:00
}
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_attempts" ) . Mark ( 1 )
mon . IntVal ( "repair_segment_size" ) . Observe ( pointer . GetSegmentSize ( ) )
2019-03-18 10:55:06 +00:00
redundancy , err := eestream . NewRedundancyStrategyFromProto ( pointer . GetRemote ( ) . GetRedundancy ( ) )
2018-12-13 07:12:36 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
return true , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-03-18 10:55:06 +00:00
pieceSize := eestream . CalcPieceSize ( pointer . GetSegmentSize ( ) , redundancy )
expiration := pointer . GetExpirationDate ( )
2018-12-13 07:12:36 +00:00
var excludeNodeIDs storj . NodeIDList
2019-06-28 20:48:51 +01:00
var healthyPieces , unhealthyPieces [ ] * pb . RemotePiece
healthyMap := make ( map [ int32 ] bool )
2019-05-16 14:49:10 +01:00
pieces := pointer . GetRemote ( ) . GetRemotePieces ( )
2019-08-06 17:35:59 +01:00
missingPieces , err := repairer . overlay . GetMissingPieces ( ctx , pieces )
2019-05-16 14:49:10 +01:00
if err != nil {
2019-08-05 16:09:16 +01:00
return false , Error . New ( "error getting missing pieces %s" , err )
2019-05-16 14:49:10 +01:00
}
numHealthy := len ( pieces ) - len ( missingPieces )
2019-09-06 20:20:36 +01:00
// irreparable piece
if int32 ( numHealthy ) < pointer . Remote . Redundancy . MinReq {
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_nodes_unavailable" ) . Mark ( 1 )
2019-10-16 16:28:56 +01:00
return true , Error . Wrap ( IrreparableError . New ( "segment cannot be repaired: only %d healthy pieces, %d required" , numHealthy , pointer . Remote . Redundancy . MinReq + 1 ) )
2019-05-16 14:49:10 +01:00
}
2019-10-02 13:58:37 +01:00
repairThreshold := pointer . Remote . Redundancy . RepairThreshold
if repairer . repairOverride != 0 {
repairThreshold = int32 ( repairer . repairOverride )
}
2019-05-16 14:49:10 +01:00
// repair not needed
2019-10-02 13:58:37 +01:00
if int32 ( numHealthy ) > repairThreshold {
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_unnecessary" ) . Mark ( 1 )
2019-10-16 16:28:56 +01:00
repairer . log . Debug ( "segment above repair threshold" , zap . Int ( "numHealthy" , numHealthy ) , zap . Int32 ( "repairThreshold" , repairThreshold ) )
2019-08-05 16:09:16 +01:00
return true , nil
2019-05-16 14:49:10 +01:00
}
2019-05-28 15:54:31 +01:00
healthyRatioBeforeRepair := 0.0
if pointer . Remote . Redundancy . Total != 0 {
healthyRatioBeforeRepair = float64 ( numHealthy ) / float64 ( pointer . Remote . Redundancy . Total )
}
2019-05-28 15:10:26 +01:00
mon . FloatVal ( "healthy_ratio_before_repair" ) . Observe ( healthyRatioBeforeRepair )
2019-05-16 14:49:10 +01:00
lostPiecesSet := sliceToSet ( missingPieces )
2019-03-18 10:55:06 +00:00
// Populate healthyPieces with all pieces from the pointer except those correlating to indices in lostPieces
2019-05-20 14:22:03 +01:00
for _ , piece := range pieces {
2019-03-18 10:55:06 +00:00
excludeNodeIDs = append ( excludeNodeIDs , piece . NodeId )
2019-07-25 17:59:46 +01:00
if ! lostPiecesSet [ piece . GetPieceNum ( ) ] {
2019-03-18 10:55:06 +00:00
healthyPieces = append ( healthyPieces , piece )
2019-06-28 20:48:51 +01:00
healthyMap [ piece . GetPieceNum ( ) ] = true
} else {
unhealthyPieces = append ( unhealthyPieces , piece )
2019-03-18 10:55:06 +00:00
}
}
2018-12-13 07:12:36 +00:00
2019-04-09 18:20:00 +01:00
bucketID , err := createBucketID ( path )
if err != nil {
2019-08-05 16:09:16 +01:00
return true , Error . Wrap ( err )
2019-04-09 18:20:00 +01:00
}
2018-12-13 07:12:36 +00:00
2019-03-28 20:09:23 +00:00
// Create the order limits for the GET_REPAIR action
2019-07-11 21:51:40 +01:00
getOrderLimits , getPrivateKey , err := repairer . orders . CreateGetRepairOrderLimits ( ctx , bucketID , pointer , healthyPieces )
2019-03-28 20:09:23 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
return false , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-07-11 23:44:47 +01:00
var requestCount int
{
totalNeeded := math . Ceil ( float64 ( redundancy . OptimalThreshold ( ) ) *
repairer . multiplierOptimalThreshold ,
)
requestCount = int ( totalNeeded ) - len ( healthyPieces )
}
2018-12-13 07:12:36 +00:00
// Request Overlay for n-h new storage nodes
2019-03-23 08:06:11 +00:00
request := overlay . FindStorageNodesRequest {
2019-07-11 23:44:47 +01:00
RequestedCount : requestCount ,
2019-03-23 08:06:11 +00:00
FreeBandwidth : pieceSize ,
FreeDisk : pieceSize ,
ExcludedNodes : excludeNodeIDs ,
2019-03-18 10:55:06 +00:00
}
2019-08-06 17:35:59 +01:00
newNodes , err := repairer . overlay . FindStorageNodes ( ctx , request )
2018-12-13 07:12:36 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
return false , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-03-18 10:55:06 +00:00
// Create the order limits for the PUT_REPAIR action
2019-07-11 21:51:40 +01:00
putLimits , putPrivateKey , err := repairer . orders . CreatePutRepairOrderLimits ( ctx , bucketID , pointer , getOrderLimits , newNodes )
2019-03-28 20:09:23 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
return false , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-03-18 10:55:06 +00:00
// Download the segment using just the healthy pieces
2019-10-16 16:28:56 +01:00
segmentReader , failedPieces , err := repairer . ec . Get ( ctx , getOrderLimits , getPrivateKey , redundancy , pointer . GetSegmentSize ( ) , path )
2019-09-16 18:13:24 +01:00
// Populate node IDs that failed piece hashes verification
var failedNodeIDs storj . NodeIDList
for _ , piece := range failedPieces {
failedNodeIDs = append ( failedNodeIDs , piece . NodeId )
}
2019-09-13 17:21:20 +01:00
// update audit status for nodes that failed piece hash verification during downloading
failedNum , updateErr := repairer . updateAuditFailStatus ( ctx , failedNodeIDs )
if updateErr != nil || failedNum > 0 {
// failed updates should not affect repair, therefore we will not return the error
repairer . log . Debug ( "failed to update audit fail status" , zap . Int ( "Failed Update Number" , failedNum ) , zap . Error ( err ) )
}
2018-12-13 07:12:36 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
// .Get() seems to only fail from input validation, so it would keep failing
return true , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-09-06 20:20:36 +01:00
defer func ( ) { err = errs . Combine ( err , segmentReader . Close ( ) ) } ( )
2018-12-13 07:12:36 +00:00
2019-03-18 10:55:06 +00:00
// Upload the repaired pieces
2019-09-06 20:20:36 +01:00
successfulNodes , hashes , err := repairer . ec . Repair ( ctx , putLimits , putPrivateKey , redundancy , segmentReader , expiration , repairer . timeout , path )
2018-12-13 07:12:36 +00:00
if err != nil {
2019-08-05 16:09:16 +01:00
return false , Error . Wrap ( err )
2018-12-13 07:12:36 +00:00
}
2019-07-25 17:59:46 +01:00
// Add the successfully uploaded pieces to repairedPieces
var repairedPieces [ ] * pb . RemotePiece
repairedMap := make ( map [ int32 ] bool )
2019-03-18 10:55:06 +00:00
for i , node := range successfulNodes {
if node == nil {
continue
2018-12-13 07:12:36 +00:00
}
2019-07-25 17:59:46 +01:00
piece := pb . RemotePiece {
2019-03-18 10:55:06 +00:00
PieceNum : int32 ( i ) ,
NodeId : node . Id ,
Hash : hashes [ i ] ,
2019-07-25 17:59:46 +01:00
}
repairedPieces = append ( repairedPieces , & piece )
repairedMap [ int32 ( i ) ] = true
2018-12-13 07:12:36 +00:00
}
2019-07-25 17:59:46 +01:00
healthyAfterRepair := int32 ( len ( healthyPieces ) + len ( repairedPieces ) )
2019-05-29 14:14:25 +01:00
switch {
2019-07-25 17:59:46 +01:00
case healthyAfterRepair <= pointer . Remote . Redundancy . RepairThreshold :
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_failed" ) . Mark ( 1 )
2019-07-25 17:59:46 +01:00
case healthyAfterRepair < pointer . Remote . Redundancy . SuccessThreshold :
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_partial" ) . Mark ( 1 )
2019-05-29 14:14:25 +01:00
default :
2019-05-28 15:10:26 +01:00
mon . Meter ( "repair_success" ) . Mark ( 1 )
}
2019-05-28 15:54:31 +01:00
healthyRatioAfterRepair := 0.0
if pointer . Remote . Redundancy . Total != 0 {
2019-07-25 17:59:46 +01:00
healthyRatioAfterRepair = float64 ( healthyAfterRepair ) / float64 ( pointer . Remote . Redundancy . Total )
2019-05-28 15:54:31 +01:00
}
2019-05-28 15:10:26 +01:00
mon . FloatVal ( "healthy_ratio_after_repair" ) . Observe ( healthyRatioAfterRepair )
2019-07-25 17:59:46 +01:00
var toRemove [ ] * pb . RemotePiece
if healthyAfterRepair >= pointer . Remote . Redundancy . SuccessThreshold {
// if full repair, remove all unhealthy pieces
toRemove = unhealthyPieces
} else {
// if partial repair, leave unrepaired unhealthy pieces in the pointer
for _ , piece := range unhealthyPieces {
if repairedMap [ piece . GetPieceNum ( ) ] {
// add only repaired pieces in the slice, unrepaired
// unhealthy pieces are not removed from the pointer
toRemove = append ( toRemove , piece )
2019-06-28 20:48:51 +01:00
}
}
}
2019-09-16 18:13:24 +01:00
// add pieces that failed piece hashes verification to the removal list
toRemove = append ( toRemove , failedPieces ... )
2019-10-07 18:54:12 +01:00
var segmentAge time . Duration
if pointer . CreationDate . Before ( pointer . LastRepaired ) {
segmentAge = time . Since ( pointer . LastRepaired )
} else {
segmentAge = time . Since ( pointer . CreationDate )
}
2019-09-17 20:18:48 +01:00
pointer . LastRepaired = time . Now ( ) . UTC ( )
pointer . RepairCount ++
2019-04-25 09:46:32 +01:00
// Update the segment pointer in the metainfo
2019-07-25 17:59:46 +01:00
_ , err = repairer . metainfo . UpdatePieces ( ctx , path , pointer , repairedPieces , toRemove )
2019-09-17 20:18:48 +01:00
if err != nil {
return false , err
}
mon . IntVal ( "segment_time_until_repair" ) . Observe ( int64 ( segmentAge . Seconds ( ) ) )
mon . IntVal ( "segment_repair_count" ) . Observe ( int64 ( pointer . RepairCount ) )
return true , nil
2019-03-18 10:55:06 +00:00
}
2019-09-13 17:21:20 +01:00
func ( repairer * SegmentRepairer ) updateAuditFailStatus ( ctx context . Context , failedAuditNodeIDs storj . NodeIDList ) ( failedNum int , err error ) {
updateRequests := make ( [ ] * overlay . UpdateRequest , len ( failedAuditNodeIDs ) )
for i , nodeID := range failedAuditNodeIDs {
updateRequests [ i ] = & overlay . UpdateRequest {
NodeID : nodeID ,
IsUp : true ,
AuditSuccess : false ,
}
}
if len ( updateRequests ) > 0 {
failed , err := repairer . overlay . BatchUpdateStats ( ctx , updateRequests )
if err != nil || len ( failed ) > 0 {
return len ( failed ) , errs . Combine ( Error . New ( "failed to update some audit fail statuses in overlay" ) , err )
}
}
return 0 , nil
}
2019-03-18 10:55:06 +00:00
// sliceToSet converts the given slice to a set
2019-07-25 17:59:46 +01:00
func sliceToSet ( slice [ ] int32 ) map [ int32 ] bool {
set := make ( map [ int32 ] bool , len ( slice ) )
2019-03-18 10:55:06 +00:00
for _ , value := range slice {
2019-07-25 17:59:46 +01:00
set [ value ] = true
2019-03-18 10:55:06 +00:00
}
return set
2018-12-13 07:12:36 +00:00
}
2019-03-28 20:09:23 +00:00
2019-04-09 18:20:00 +01:00
func createBucketID ( path storj . Path ) ( [ ] byte , error ) {
2019-03-28 20:09:23 +00:00
comps := storj . SplitPath ( path )
2019-04-09 18:20:00 +01:00
if len ( comps ) < 3 {
return nil , Error . New ( "no bucket component in path: %s" , path )
2019-03-28 20:09:23 +00:00
}
2019-04-09 18:20:00 +01:00
return [ ] byte ( storj . JoinPaths ( comps [ 0 ] , comps [ 2 ] ) ) , nil
2019-03-28 20:09:23 +00:00
}