repair segment reassess it missing pieces just before repair (#1939)

* repair segment reaccess it missing pieces just before repair to see if it actually needs repair
This commit is contained in:
aligeti 2019-05-16 09:49:10 -04:00 committed by GitHub
parent 0531d11434
commit 60cf1dafb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 26 deletions

View File

@ -121,7 +121,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
continue continue
} }
missingPieces, err := checker.getMissingPieces(ctx, pieces) missingPieces, err := checker.overlay.GetMissingPieces(ctx, pieces)
if err != nil { if err != nil {
return Error.New("error getting missing pieces %s", err) return Error.New("error getting missing pieces %s", err)
} }
@ -184,26 +184,6 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
return nil return nil
} }
func (checker *Checker) getMissingPieces(ctx context.Context, pieces []*pb.RemotePiece) (missingPieces []int32, err error) {
var nodeIDs storj.NodeIDList
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.NodeId)
}
badNodeIDs, err := checker.overlay.KnownUnreliableOrOffline(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}
for _, p := range pieces {
for _, nodeID := range badNodeIDs {
if nodeID == p.NodeId {
missingPieces = append(missingPieces, p.GetPieceNum())
}
}
}
return missingPieces, nil
}
// checks for a string in slice // checks for a string in slice
func contains(a []string, x string) bool { func contains(a []string, x string) bool {
for _, n := range a { for _, n := range a {

View File

@ -50,7 +50,7 @@ func (c Config) GetSegmentRepairer(ctx context.Context, tc transport.Client, met
// SegmentRepairer is a repairer for segments // SegmentRepairer is a repairer for segments
type SegmentRepairer interface { type SegmentRepairer interface {
Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) Repair(ctx context.Context, path storj.Path) (err error)
} }
// Service contains the information needed to run the repair service // Service contains the information needed to run the repair service
@ -124,7 +124,7 @@ func (service *Service) process(ctx context.Context) error {
} }
service.Limiter.Go(ctx, func() { service.Limiter.Go(ctx, func() {
err := service.repairer.Repair(ctx, seg.GetPath(), seg.GetLostPieces()) err := service.repairer.Repair(ctx, seg.GetPath())
if err != nil { if err != nil {
zap.L().Error("repair failed", zap.Error(err)) zap.L().Error("repair failed", zap.Error(err))
} }

View File

@ -306,3 +306,24 @@ func (cache *Cache) ConnSuccess(ctx context.Context, node *pb.Node) {
zap.L().Debug("error updating node connection info", zap.Error(err)) zap.L().Debug("error updating node connection info", zap.Error(err))
} }
} }
// GetMissingPieces returns the list of offline nodes
func (cache *Cache) GetMissingPieces(ctx context.Context, pieces []*pb.RemotePiece) (missingPieces []int32, err error) {
var nodeIDs storj.NodeIDList
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.NodeId)
}
badNodeIDs, err := cache.KnownUnreliableOrOffline(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}
for _, p := range pieces {
for _, nodeID := range badNodeIDs {
if nodeID == p.NodeId {
missingPieces = append(missingPieces, p.GetPieceNum())
}
}
}
return missingPieces, nil
}

View File

@ -42,7 +42,7 @@ func NewSegmentRepairer(metainfo *metainfo.Service, orders *orders.Service, cach
} }
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes // Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
func (repairer *Repairer) Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) { func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
// Read the segment pointer from the metainfo // Read the segment pointer from the metainfo
@ -65,7 +65,24 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path, lostPiece
var excludeNodeIDs storj.NodeIDList var excludeNodeIDs storj.NodeIDList
var healthyPieces []*pb.RemotePiece var healthyPieces []*pb.RemotePiece
lostPiecesSet := sliceToSet(lostPieces) pieces := pointer.GetRemote().GetRemotePieces()
missingPieces, err := repairer.cache.GetMissingPieces(ctx, pieces)
if err != nil {
return Error.New("error getting missing pieces %s", err)
}
numHealthy := len(pieces) - len(missingPieces)
// irreparable piece
if int32(numHealthy) < pointer.Remote.Redundancy.MinReq {
return Error.New("piece cannot be repaired")
}
// repair not needed
if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) > pointer.Remote.Redundancy.RepairThreshold) {
return nil
}
lostPiecesSet := sliceToSet(missingPieces)
// Populate healthyPieces with all pieces from the pointer except those correlating to indices in lostPieces // Populate healthyPieces with all pieces from the pointer except those correlating to indices in lostPieces
for _, piece := range pointer.GetRemote().GetRemotePieces() { for _, piece := range pointer.GetRemote().GetRemotePieces() {

View File

@ -100,7 +100,7 @@ func TestSegmentStoreRepair(t *testing.T) {
repairer := segments.NewSegmentRepairer(metainfo, os, oc, ec, satellite.Identity, time.Minute) repairer := segments.NewSegmentRepairer(metainfo, os, oc, ec, satellite.Identity, time.Minute)
assert.NotNil(t, repairer) assert.NotNil(t, repairer)
err = repairer.Repair(ctx, path, lostPieces) err = repairer.Repair(ctx, path)
assert.NoError(t, err) assert.NoError(t, err)
// kill one of the nodes kept alive to ensure repair worked // kill one of the nodes kept alive to ensure repair worked