satellite/repair: put irreparable segments in irreparableDB
Previously, we were simply discarding rows from the repair queue when they couldn't be repaired (either because the overlay said too many nodes were down, or because we failed to download enough pieces). Now, such segments will be put into the irreparableDB for further and (hopefully) more focused attention. This change also better differentiates some error cases from Repair() for monitoring purposes. Change-Id: I82a52a6da50c948ddd651048e2a39cb4b1e6df5c
This commit is contained in:
parent
178dbb4683
commit
79553059cb
@ -66,8 +66,8 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
|
||||
db.RepairQueue(),
|
||||
db.Buckets(),
|
||||
db.OverlayCache(),
|
||||
db.Orders(),
|
||||
rollupsWriteCache,
|
||||
db.Irreparable(),
|
||||
version.Build,
|
||||
&runCfg.Config,
|
||||
)
|
||||
|
@ -80,7 +80,9 @@ storj.io/storj/satellite/repair/repairer."repair_segment_pieces_successful" IntV
|
||||
storj.io/storj/satellite/repair/repairer."repair_segment_pieces_total" IntVal
|
||||
storj.io/storj/satellite/repair/repairer."repair_segment_size" IntVal
|
||||
storj.io/storj/satellite/repair/repairer."repair_success" Meter
|
||||
storj.io/storj/satellite/repair/repairer."repair_too_many_nodes_failed" Meter
|
||||
storj.io/storj/satellite/repair/repairer."repair_unnecessary" Meter
|
||||
storj.io/storj/satellite/repair/repairer."segment_deleted_before_repair" Meter
|
||||
storj.io/storj/satellite/repair/repairer."segment_repair_count" IntVal
|
||||
storj.io/storj/satellite/repair/repairer."segment_time_until_repair" IntVal
|
||||
storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal
|
||||
|
@ -237,6 +237,16 @@ func (planet *Planet) StopPeer(peer Peer) error {
|
||||
// Size returns number of nodes in the network
|
||||
func (planet *Planet) Size() int { return len(planet.uplinks) + len(planet.peers) }
|
||||
|
||||
// FindNode is a helper to retrieve a storage node record by its node ID.
|
||||
func (planet *Planet) FindNode(nodeID storj.NodeID) *storagenode.Peer {
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == nodeID {
|
||||
return node
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down all the nodes and deletes temporary directories.
|
||||
func (planet *Planet) Shutdown() error {
|
||||
if !planet.started {
|
||||
|
@ -626,7 +626,7 @@ func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db
|
||||
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
|
||||
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
|
||||
|
||||
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), rollupsWriteCache, versionInfo, &config)
|
||||
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), rollupsWriteCache, db.Irreparable(), versionInfo, &config)
|
||||
}
|
||||
|
||||
type rollupsWriteCacheCloser struct {
|
||||
|
@ -472,14 +472,15 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestRemoveIrreparableSegmentFromQueue
|
||||
// TestIrreparableSegmentAccordingToOverlay
|
||||
// - Upload tests data to 7 nodes
|
||||
// - Kill nodes so that repair threshold > online nodes > minimum threshold
|
||||
// - Disqualify nodes so that repair threshold > online nodes > minimum threshold
|
||||
// - Call checker to add segment to the repair queue
|
||||
// - Kill nodes so that online nodes < minimum threshold
|
||||
// - Disqualify nodes so that online nodes < minimum threshold
|
||||
// - Run the repairer
|
||||
// - Verify segment is no longer in the repair queue and segment should be the same
|
||||
func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
// - Verify segment is now in the irreparable db instead
|
||||
func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 10,
|
||||
@ -495,6 +496,7 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
satellite.Audit.Worker.Loop.Stop()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Checker.IrreparableLoop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
@ -502,25 +504,14 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, _ := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// kill nodes and track lost pieces
|
||||
nodesToDQ := make(map[storj.NodeID]bool)
|
||||
|
||||
// Kill 3 nodes so that pointer has 4 left (less than repair threshold)
|
||||
toKill := 3
|
||||
pointer, encryptedPath := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// dq 3 nodes so that pointer has 4 left (less than repair threshold)
|
||||
toDQ := 3
|
||||
remotePieces := pointer.GetRemote().GetRemotePieces()
|
||||
|
||||
for i, piece := range remotePieces {
|
||||
if i >= toKill {
|
||||
continue
|
||||
}
|
||||
nodesToDQ[piece.NodeId] = true
|
||||
}
|
||||
|
||||
for nodeID := range nodesToDQ {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
|
||||
for i := 0; i < toDQ; i++ {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].NodeId)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -529,7 +520,7 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
// Kill nodes so that online nodes < minimum threshold
|
||||
// Disqualify nodes so that online nodes < minimum threshold
|
||||
// This will make the segment irreparable
|
||||
for _, piece := range remotePieces {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.NodeId)
|
||||
@ -542,16 +533,142 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 1)
|
||||
|
||||
// Verify that the segment is not in the irreparable db
|
||||
irreparableSegment, err := satellite.DB.Irreparable().Get(ctx, []byte(encryptedPath))
|
||||
require.Error(t, err)
|
||||
require.Nil(t, irreparableSegment)
|
||||
|
||||
// Run the repairer
|
||||
beforeRepair := time.Now().Truncate(time.Second)
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.WaitForPendingRepairs()
|
||||
afterRepair := time.Now().Truncate(time.Second)
|
||||
|
||||
// Verify that the segment was removed
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 0)
|
||||
|
||||
// Verify that the segment _is_ in the irreparable db
|
||||
irreparableSegment, err = satellite.DB.Irreparable().Get(ctx, []byte(encryptedPath))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, encryptedPath, string(irreparableSegment.Path))
|
||||
lastAttemptTime := time.Unix(irreparableSegment.LastRepairAttempt, 0)
|
||||
require.Falsef(t, lastAttemptTime.Before(beforeRepair), "%s is before %s", lastAttemptTime, beforeRepair)
|
||||
require.Falsef(t, lastAttemptTime.After(afterRepair), "%s is after %s", lastAttemptTime, afterRepair)
|
||||
})
|
||||
}
|
||||
|
||||
func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *storagenode.Peer, isUp bool, timestamp time.Time) error {
|
||||
local := node.Local()
|
||||
checkInInfo := overlay.NodeCheckInInfo{
|
||||
NodeID: node.ID(),
|
||||
Address: local.Address,
|
||||
LastIP: local.LastIp,
|
||||
IsUp: isUp,
|
||||
Operator: &local.Operator,
|
||||
Capacity: &local.Capacity,
|
||||
Version: &local.Version,
|
||||
}
|
||||
return overlayDB.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-24*time.Hour), overlay.NodeSelectionConfig{})
|
||||
}
|
||||
|
||||
// TestIrreparableSegmentNodesOffline
|
||||
// - Upload tests data to 7 nodes
|
||||
// - Disqualify nodes so that repair threshold > online nodes > minimum threshold
|
||||
// - Call checker to add segment to the repair queue
|
||||
// - Kill (as opposed to disqualifying) nodes so that online nodes < minimum threshold
|
||||
// - Run the repairer
|
||||
// - Verify segment is no longer in the repair queue and segment should be the same
|
||||
// - Verify segment is now in the irreparable db instead
|
||||
func TestIrreparableSegmentNodesOffline(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 10,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(3, 5, 7, 7),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// first, upload some remote data
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Stop()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Checker.IrreparableLoop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, encryptedPath := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// kill 3 nodes and mark them as offline so that pointer has 4 left from overlay
|
||||
// perspective (less than repair threshold)
|
||||
toMarkOffline := 3
|
||||
remotePieces := pointer.GetRemote().GetRemotePieces()
|
||||
|
||||
for i := 0; i < toMarkOffline; i++ {
|
||||
node := planet.FindNode(remotePieces[i].NodeId)
|
||||
stopNodeByID(t, ctx, planet, node.ID())
|
||||
err = updateNodeCheckIn(ctx, satellite.DB.OverlayCache(), node, false, time.Now().Add(-24*time.Hour))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
// Verify that the segment is on the repair queue
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 1)
|
||||
|
||||
// Kill 2 extra nodes so that the number of available pieces is less than the minimum
|
||||
for i := toMarkOffline; i < toMarkOffline+2; i++ {
|
||||
stopNodeByID(t, ctx, planet, remotePieces[i].NodeId)
|
||||
}
|
||||
|
||||
// Mark nodes as online again so that online nodes > minimum threshold
|
||||
// This will make the repair worker attempt to download the pieces
|
||||
for i := 0; i < toMarkOffline; i++ {
|
||||
node := planet.FindNode(remotePieces[i].NodeId)
|
||||
err := updateNodeCheckIn(ctx, satellite.DB.OverlayCache(), node, true, time.Now())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify that the segment is not in the irreparable db
|
||||
irreparableSegment, err := satellite.DB.Irreparable().Get(ctx, []byte(encryptedPath))
|
||||
require.Error(t, err)
|
||||
require.Nil(t, irreparableSegment)
|
||||
|
||||
// Run the repairer
|
||||
beforeRepair := time.Now().Truncate(time.Second)
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.WaitForPendingRepairs()
|
||||
afterRepair := time.Now().Truncate(time.Second)
|
||||
|
||||
// Verify that the segment was removed from the repair queue
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 0)
|
||||
|
||||
// Verify that the segment _is_ in the irreparable db
|
||||
irreparableSegment, err = satellite.DB.Irreparable().Get(ctx, []byte(encryptedPath))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, encryptedPath, string(irreparableSegment.Path))
|
||||
lastAttemptTime := time.Unix(irreparableSegment.LastRepairAttempt, 0)
|
||||
require.Falsef(t, lastAttemptTime.Before(beforeRepair), "%s is before %s", lastAttemptTime, beforeRepair)
|
||||
require.Falsef(t, lastAttemptTime.After(afterRepair), "%s is after %s", lastAttemptTime, afterRepair)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -143,7 +143,11 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
|
||||
if successfulPieces < es.RequiredCount() {
|
||||
mon.Meter("download_failed_not_enough_pieces_repair").Mark(1) //locked
|
||||
return nil, failedPieces, Error.New("couldn't download enough pieces for segment: %s, number of successful downloaded pieces (%d) is less than required number (%d)", path, successfulPieces, es.RequiredCount())
|
||||
return nil, failedPieces, &irreparableError{
|
||||
path: path,
|
||||
piecesAvailable: int32(successfulPieces),
|
||||
piecesRequired: int32(es.RequiredCount()),
|
||||
}
|
||||
}
|
||||
|
||||
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/repair/irreparable"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -46,10 +47,11 @@ type Service struct {
|
||||
JobLimiter *semaphore.Weighted
|
||||
Loop *sync2.Cycle
|
||||
repairer *SegmentRepairer
|
||||
irrDB irreparable.DB
|
||||
}
|
||||
|
||||
// NewService creates repairing service
|
||||
func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repairer *SegmentRepairer) *Service {
|
||||
func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repairer *SegmentRepairer, irrDB irreparable.DB) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
queue: queue,
|
||||
@ -57,6 +59,7 @@ func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repair
|
||||
JobLimiter: semaphore.NewWeighted(int64(config.MaxRepair)),
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
repairer: repairer,
|
||||
irrDB: irrDB,
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,21 +159,38 @@ func (service *Service) worker(ctx context.Context, seg *pb.InjuredSegment) (err
|
||||
// note that shouldDelete is used even in the case where err is not null
|
||||
shouldDelete, err := service.repairer.Repair(ctx, string(seg.GetPath()))
|
||||
if shouldDelete {
|
||||
if IrreparableError.Has(err) {
|
||||
service.log.Error("deleting irreparable segment from the queue:",
|
||||
zap.Error(service.queue.Delete(ctx, seg)),
|
||||
zap.Binary("Segment", seg.GetPath()),
|
||||
)
|
||||
} else {
|
||||
service.log.Info("deleting segment from repair queue", zap.Binary("Segment", seg.GetPath()))
|
||||
if irreparableErr, ok := err.(*irreparableError); ok {
|
||||
service.log.Error("segment could not be repaired! adding to irreparableDB for more attention",
|
||||
zap.Error(err),
|
||||
zap.Binary("segment", seg.GetPath()))
|
||||
segmentInfo := &pb.IrreparableSegment{
|
||||
Path: seg.GetPath(),
|
||||
SegmentDetail: irreparableErr.segmentInfo,
|
||||
LostPieces: irreparableErr.piecesRequired - irreparableErr.piecesAvailable,
|
||||
LastRepairAttempt: time.Now().Unix(),
|
||||
RepairAttemptCount: int64(1),
|
||||
}
|
||||
if err := service.irrDB.IncrementRepairAttempts(ctx, segmentInfo); err != nil {
|
||||
service.log.Error("failed to add segment to irreparableDB! will leave in repair queue", zap.Error(err))
|
||||
shouldDelete = false
|
||||
}
|
||||
} else if err != nil {
|
||||
service.log.Error("unexpected error repairing segment!",
|
||||
zap.Error(err),
|
||||
zap.Binary("segment", seg.GetPath()))
|
||||
} else {
|
||||
service.log.Info("removing repaired segment from repair queue",
|
||||
zap.Binary("Segment", seg.GetPath()))
|
||||
}
|
||||
if shouldDelete {
|
||||
delErr := service.queue.Delete(ctx, seg)
|
||||
if delErr != nil {
|
||||
err = errs.Combine(err, Error.New("deleting repaired segment from the queue: %v", delErr))
|
||||
err = errs.Combine(err, Error.New("failed to remove segment from queue: %v", delErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return Error.New("repairing injured segment: %v", err)
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
repairedTime := time.Now().UTC()
|
||||
|
@ -5,6 +5,7 @@ package repairer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
@ -21,8 +22,29 @@ import (
|
||||
"storj.io/uplink/private/eestream"
|
||||
)
|
||||
|
||||
// IrreparableError is the errs class of irreparable segment errors
|
||||
var IrreparableError = errs.Class("irreparable error")
|
||||
var (
|
||||
metainfoGetError = errs.Class("metainfo db get error")
|
||||
metainfoPutError = errs.Class("metainfo db put error")
|
||||
invalidRepairError = errs.Class("invalid repair")
|
||||
overlayQueryError = errs.Class("overlay query failure")
|
||||
orderLimitFailureError = errs.Class("order limits failure")
|
||||
repairReconstructError = errs.Class("repair reconstruction failure")
|
||||
repairPutError = errs.Class("repair could not store repaired pieces")
|
||||
)
|
||||
|
||||
// irreparableError identifies situations where a segment could not be repaired due to reasons
|
||||
// which are hopefully transient (e.g. too many pieces unavailable). The segment should be added
|
||||
// to the irreparableDB.
|
||||
type irreparableError struct {
|
||||
path storj.Path
|
||||
piecesAvailable int32
|
||||
piecesRequired int32
|
||||
segmentInfo *pb.Pointer
|
||||
}
|
||||
|
||||
func (ie *irreparableError) Error() string {
|
||||
return fmt.Sprintf("%d available pieces < %d required", ie.piecesAvailable, ie.piecesRequired)
|
||||
}
|
||||
|
||||
// SegmentRepairer for segments
|
||||
type SegmentRepairer struct {
|
||||
@ -82,14 +104,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
if err != nil {
|
||||
if storj.ErrObjectNotFound.Has(err) {
|
||||
mon.Meter("repair_unnecessary").Mark(1) //locked
|
||||
mon.Meter("segment_deleted_before_repair").Mark(1) //locked
|
||||
repairer.log.Debug("segment was deleted", zap.Binary("Segment", []byte(path)))
|
||||
return true, nil
|
||||
}
|
||||
return false, Error.Wrap(err)
|
||||
return false, metainfoGetError.Wrap(err)
|
||||
}
|
||||
|
||||
if pointer.GetType() != pb.Pointer_REMOTE {
|
||||
return true, Error.New("cannot repair inline segment")
|
||||
return true, invalidRepairError.New("cannot repair inline segment")
|
||||
}
|
||||
|
||||
mon.Meter("repair_attempts").Mark(1) //locked
|
||||
@ -97,7 +120,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return true, Error.Wrap(err)
|
||||
return true, invalidRepairError.New("invalid redundancy strategy: %w", err)
|
||||
}
|
||||
|
||||
var excludeNodeIDs storj.NodeIDList
|
||||
@ -106,14 +129,19 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces)
|
||||
if err != nil {
|
||||
return false, Error.New("error getting missing pieces %s", err)
|
||||
return false, overlayQueryError.New("error identifying missing pieces: %w", err)
|
||||
}
|
||||
|
||||
numHealthy := len(pieces) - len(missingPieces)
|
||||
// irreparable piece
|
||||
if int32(numHealthy) < pointer.Remote.Redundancy.MinReq {
|
||||
mon.Meter("repair_nodes_unavailable").Mark(1) //locked
|
||||
return true, Error.Wrap(IrreparableError.New("segment cannot be repaired: only %d healthy pieces, %d required", numHealthy, pointer.Remote.Redundancy.MinReq+1))
|
||||
return true, &irreparableError{
|
||||
path: path,
|
||||
piecesAvailable: int32(numHealthy),
|
||||
piecesRequired: pointer.Remote.Redundancy.MinReq,
|
||||
segmentInfo: pointer,
|
||||
}
|
||||
}
|
||||
|
||||
repairThreshold := pointer.Remote.Redundancy.RepairThreshold
|
||||
@ -149,13 +177,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
|
||||
bucketID, err := createBucketID(path)
|
||||
if err != nil {
|
||||
return true, Error.Wrap(err)
|
||||
return true, invalidRepairError.New("invalid path; cannot repair segment: %w", err)
|
||||
}
|
||||
|
||||
// Create the order limits for the GET_REPAIR action
|
||||
getOrderLimits, getPrivateKey, err := repairer.orders.CreateGetRepairOrderLimits(ctx, bucketID, pointer, healthyPieces)
|
||||
if err != nil {
|
||||
return false, Error.Wrap(err)
|
||||
return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err)
|
||||
}
|
||||
|
||||
var requestCount int
|
||||
@ -173,13 +201,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
}
|
||||
newNodes, err := repairer.overlay.FindStorageNodes(ctx, request)
|
||||
if err != nil {
|
||||
return false, Error.Wrap(err)
|
||||
return false, overlayQueryError.Wrap(err)
|
||||
}
|
||||
|
||||
// Create the order limits for the PUT_REPAIR action
|
||||
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, bucketID, pointer, getOrderLimits, newNodes)
|
||||
if err != nil {
|
||||
return false, Error.Wrap(err)
|
||||
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
|
||||
}
|
||||
|
||||
// Download the segment using just the healthy pieces
|
||||
@ -198,15 +226,23 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
repairer.log.Debug("failed to update audit fail status", zap.Int("Failed Update Number", failedNum), zap.Error(err))
|
||||
}
|
||||
if err != nil {
|
||||
// .Get() seems to only fail from input validation, so it would keep failing
|
||||
return true, Error.Wrap(err)
|
||||
// If Get failed because of input validation, then it will keep failing. But if it
|
||||
// gave us irreparableError, then we failed to download enough pieces and must try
|
||||
// to wait for nodes to come back online.
|
||||
if irreparableErr, ok := err.(*irreparableError); ok {
|
||||
mon.Meter("repair_too_many_nodes_failed").Mark(1) //locked
|
||||
irreparableErr.segmentInfo = pointer
|
||||
return true, irreparableErr
|
||||
}
|
||||
// The segment's redundancy strategy is invalid, or else there was an internal error.
|
||||
return true, repairReconstructError.New("segment could not be reconstructed: %w", err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, segmentReader.Close()) }()
|
||||
|
||||
// Upload the repaired pieces
|
||||
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, path)
|
||||
if err != nil {
|
||||
return false, Error.Wrap(err)
|
||||
return false, repairPutError.Wrap(err)
|
||||
}
|
||||
|
||||
// Add the successfully uploaded pieces to repairedPieces
|
||||
@ -228,6 +264,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
healthyAfterRepair := int32(len(healthyPieces) + len(repairedPieces))
|
||||
switch {
|
||||
case healthyAfterRepair <= pointer.Remote.Redundancy.RepairThreshold:
|
||||
// Important: this indicates a failure to PUT enough pieces to the network to pass
|
||||
// the repair threshold, and _not_ a failure to reconstruct the segment. But we
|
||||
// put at least one piece, else ec.Repair() would have returned an error. So the
|
||||
// repair "succeeded" in that the segment is now healthier than it was, but it is
|
||||
// not as healthy as we want it to be.
|
||||
mon.Meter("repair_failed").Mark(1) //locked
|
||||
case healthyAfterRepair < pointer.Remote.Redundancy.SuccessThreshold:
|
||||
mon.Meter("repair_partial").Mark(1) //locked
|
||||
@ -272,7 +313,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
// Update the segment pointer in the metainfo
|
||||
_, err = repairer.metainfo.UpdatePieces(ctx, path, pointer, repairedPieces, toRemove)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, metainfoPutError.Wrap(err)
|
||||
}
|
||||
|
||||
mon.IntVal("segment_time_until_repair").Observe(int64(segmentAge.Seconds())) //locked
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/repair/irreparable"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
"storj.io/storj/satellite/repair/repairer"
|
||||
)
|
||||
@ -68,8 +69,8 @@ type Repairer struct {
|
||||
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
pointerDB metainfo.PointerDB,
|
||||
revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
|
||||
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB,
|
||||
rollupsWriteCache *orders.RollupsWriteCache,
|
||||
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB,
|
||||
rollupsWriteCache *orders.RollupsWriteCache, irrDB irreparable.DB,
|
||||
versionInfo version.Info, config *Config) (*Repairer, error) {
|
||||
peer := &Repairer{
|
||||
Log: log,
|
||||
@ -175,7 +176,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
config.Repairer.DownloadTimeout,
|
||||
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
|
||||
)
|
||||
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer)
|
||||
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer, irrDB)
|
||||
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "repair",
|
||||
|
@ -62,14 +62,14 @@ func (db *irreparableDB) Get(ctx context.Context, segmentPath []byte) (resp *pb.
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
dbxInfo, err := db.db.Get_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentPath))
|
||||
if err != nil {
|
||||
return &pb.IrreparableSegment{}, Error.Wrap(err)
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
p := &pb.Pointer{}
|
||||
|
||||
err = proto.Unmarshal(dbxInfo.Segmentdetail, p)
|
||||
if err != nil {
|
||||
return &pb.IrreparableSegment{}, err
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return &pb.IrreparableSegment{
|
||||
|
Loading…
Reference in New Issue
Block a user