From 95aa33c96458b7a7634f1c40440e3b3385bfdefd Mon Sep 17 00:00:00 2001 From: Yingrong Zhao Date: Fri, 13 Sep 2019 12:21:20 -0400 Subject: [PATCH] satellite/repair/repairer: update audit status as failed after failing piece hash verification (#2997) * update audit status as failed for nodes that failed piece hash verification * remove comment * fix lint error * add test * fix format * use named return value for Get * add comments * add more better comment * format --- satellite/repair/repair_test.go | 17 +++++++++++++++-- satellite/repair/repairer/ec.go | 25 +++++++++++++++++-------- satellite/repair/repairer/segments.go | 27 ++++++++++++++++++++++++++- 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index dcac41988..7152eab62 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -256,6 +256,11 @@ func TestCorruptDataRepair(t *testing.T) { Key: corruptedPiece.Bytes(), } + overlay := planet.Satellites[0].Overlay.Service + node, err := overlay.Get(ctx, corruptedNodeID) + require.NoError(t, err) + corruptedNodeReputation := node.Reputation + // get currently stored piece data from storagenode reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef) require.NoError(t, err) @@ -271,8 +276,9 @@ func TestCorruptDataRepair(t *testing.T) { err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef) require.NoError(t, err) - // corrupt data and write back to storagenode - pieceData[0]++ // if we don't do this, this test should fail + // corrupt piece data (not PieceHeader) and write back to storagenode + // this means repair downloading should fail during piece hash verification + pieceData[pieceSize-1]++ // if we don't do this, this test should fail writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize) require.NoError(t, err) @@ -291,6 +297,13 @@ func TestCorruptDataRepair(t *testing.T) { satellite.Repair.Repairer.Loop.Pause() satellite.Repair.Repairer.Limiter.Wait() + // repair should update audit status as fail + node, err = overlay.Get(ctx, corruptedNodeID) + require.NoError(t, err) + require.Equal(t, corruptedNodeReputation.AuditCount+1, node.Reputation.AuditCount) + require.True(t, corruptedNodeReputation.AuditReputationBeta < node.Reputation.AuditReputationBeta) + require.True(t, corruptedNodeReputation.AuditReputationAlpha >= node.Reputation.AuditReputationAlpha) + // repair should fail, so segment should contain all the original nodes metainfoService := satellite.Metainfo.Service pointer, err = metainfoService.Get(ctx, path) diff --git a/satellite/repair/repairer/ec.go b/satellite/repair/repairer/ec.go index f4c3ed938..f40909b85 100644 --- a/satellite/repair/repairer/ec.go +++ b/satellite/repair/repairer/ec.go @@ -28,6 +28,9 @@ import ( "storj.io/storj/uplink/piecestore" ) +// ErrPieceHashVerifyFailed is the errs class when a piece hash downloaded from storagenode fails to match the original hash. +var ErrPieceHashVerifyFailed = errs.Class("piece hashes don't match") + // ECRepairer allows the repairer to download, verify, and upload pieces from storagenodes. type ECRepairer struct { log *zap.Logger @@ -53,17 +56,18 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n *pb.Node) (*piecesto // It attempts to download from the minimum required number based on the redundancy scheme. // After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece. // If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits. -func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, err error) { +// If piece hash verification fails, it will return all failed node IDs. +func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedNodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) if len(limits) != es.TotalCount() { - return nil, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount()) + return nil, nil, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount()) } nonNilLimits := nonNilCount(limits) if nonNilLimits < es.RequiredCount() { - return nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount()) + return nil, nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount()) } pieceSize := eestream.CalcPieceSize(dataSize, es) @@ -111,7 +115,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cond.L.Lock() inProgress-- if err != nil { - ec.log.Debug("Failed to download pieces for repair.", zap.Error(err)) + // gather nodes that failed to match piece hash with the original piece hash + if ErrPieceHashVerifyFailed.Has(err) { + failedNodes = append(failedNodes, limit.GetLimit().StorageNodeId) + } else { + ec.log.Debug("Failed to download pieces for repair", zap.Error(err)) + } return } @@ -126,12 +135,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, limiter.Wait() if successfulPieces < es.RequiredCount() { - return nil, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount()) + return nil, failedNodes, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount()) } fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount()) if err != nil { - return nil, Error.Wrap(err) + return nil, failedNodes, Error.Wrap(err) } esScheme := eestream.NewUnsafeRSScheme(fec, es.ErasureShareSize()) @@ -140,7 +149,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, ctx, cancel := context.WithCancel(ctx) decodeReader := eestream.DecodeReaders(ctx, cancel, ec.log.Named("decode readers"), pieceReaders, esScheme, expectedSize, 0, false) - return decodeReader, nil + return decodeReader, failedNodes, nil } // downloadAndVerifyPiece downloads a piece from a storagenode, @@ -188,7 +197,7 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr // verify the hashes from storage node calculatedHash := pkcrypto.SHA256Hash(pieceBytes) if err := verifyPieceHash(ctx, originalLimit, hash, calculatedHash); err != nil { - return nil, err + return nil, ErrPieceHashVerifyFailed.Wrap(err) } return pieceBytes, nil diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 251d3ed89..93f975f58 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -67,6 +67,7 @@ func NewSegmentRepairer( // Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes // note that shouldDelete is used even in the case where err is not null +// note that it will update audit status as failed for nodes that failed piece hash verification during repair downloading func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (shouldDelete bool, err error) { defer mon.Task()(&ctx, path)(&err) @@ -171,7 +172,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s } // Download the segment using just the healthy pieces - segmentReader, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize()) + segmentReader, failedNodeIDs, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize()) + // update audit status for nodes that failed piece hash verification during downloading + failedNum, updateErr := repairer.updateAuditFailStatus(ctx, failedNodeIDs) + if updateErr != nil || failedNum > 0 { + // failed updates should not affect repair, therefore we will not return the error + repairer.log.Debug("failed to update audit fail status", zap.Int("Failed Update Number", failedNum), zap.Error(err)) + } if err != nil { // .Get() seems to only fail from input validation, so it would keep failing return true, Error.Wrap(err) @@ -236,6 +243,24 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s return err == nil, err } +func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failedNum int, err error) { + updateRequests := make([]*overlay.UpdateRequest, len(failedAuditNodeIDs)) + for i, nodeID := range failedAuditNodeIDs { + updateRequests[i] = &overlay.UpdateRequest{ + NodeID: nodeID, + IsUp: true, + AuditSuccess: false, + } + } + if len(updateRequests) > 0 { + failed, err := repairer.overlay.BatchUpdateStats(ctx, updateRequests) + if err != nil || len(failed) > 0 { + return len(failed), errs.Combine(Error.New("failed to update some audit fail statuses in overlay"), err) + } + } + return 0, nil +} + // sliceToSet converts the given slice to a set func sliceToSet(slice []int32) map[int32]bool { set := make(map[int32]bool, len(slice))