satellite/repair/repairer: update audit status as failed after failing piece hash verification (#2997)
* update audit status as failed for nodes that failed piece hash verification * remove comment * fix lint error * add test * fix format * use named return value for Get * add comments * add more better comment * format
This commit is contained in:
parent
46ada7b6b1
commit
95aa33c964
@ -256,6 +256,11 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
Key: corruptedPiece.Bytes(),
|
||||
}
|
||||
|
||||
overlay := planet.Satellites[0].Overlay.Service
|
||||
node, err := overlay.Get(ctx, corruptedNodeID)
|
||||
require.NoError(t, err)
|
||||
corruptedNodeReputation := node.Reputation
|
||||
|
||||
// get currently stored piece data from storagenode
|
||||
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
@ -271,8 +276,9 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// corrupt data and write back to storagenode
|
||||
pieceData[0]++ // if we don't do this, this test should fail
|
||||
// corrupt piece data (not PieceHeader) and write back to storagenode
|
||||
// this means repair downloading should fail during piece hash verification
|
||||
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
|
||||
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -291,6 +297,13 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// repair should update audit status as fail
|
||||
node, err = overlay.Get(ctx, corruptedNodeID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, corruptedNodeReputation.AuditCount+1, node.Reputation.AuditCount)
|
||||
require.True(t, corruptedNodeReputation.AuditReputationBeta < node.Reputation.AuditReputationBeta)
|
||||
require.True(t, corruptedNodeReputation.AuditReputationAlpha >= node.Reputation.AuditReputationAlpha)
|
||||
|
||||
// repair should fail, so segment should contain all the original nodes
|
||||
metainfoService := satellite.Metainfo.Service
|
||||
pointer, err = metainfoService.Get(ctx, path)
|
||||
|
@ -28,6 +28,9 @@ import (
|
||||
"storj.io/storj/uplink/piecestore"
|
||||
)
|
||||
|
||||
// ErrPieceHashVerifyFailed is the errs class when a piece hash downloaded from storagenode fails to match the original hash.
|
||||
var ErrPieceHashVerifyFailed = errs.Class("piece hashes don't match")
|
||||
|
||||
// ECRepairer allows the repairer to download, verify, and upload pieces from storagenodes.
|
||||
type ECRepairer struct {
|
||||
log *zap.Logger
|
||||
@ -53,17 +56,18 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n *pb.Node) (*piecesto
|
||||
// It attempts to download from the minimum required number based on the redundancy scheme.
|
||||
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
|
||||
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, err error) {
|
||||
// If piece hash verification fails, it will return all failed node IDs.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedNodes storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(limits) != es.TotalCount() {
|
||||
return nil, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
|
||||
return nil, nil, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
|
||||
}
|
||||
|
||||
nonNilLimits := nonNilCount(limits)
|
||||
|
||||
if nonNilLimits < es.RequiredCount() {
|
||||
return nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
|
||||
return nil, nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
|
||||
}
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(dataSize, es)
|
||||
@ -111,7 +115,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
cond.L.Lock()
|
||||
inProgress--
|
||||
if err != nil {
|
||||
ec.log.Debug("Failed to download pieces for repair.", zap.Error(err))
|
||||
// gather nodes that failed to match piece hash with the original piece hash
|
||||
if ErrPieceHashVerifyFailed.Has(err) {
|
||||
failedNodes = append(failedNodes, limit.GetLimit().StorageNodeId)
|
||||
} else {
|
||||
ec.log.Debug("Failed to download pieces for repair", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -126,12 +135,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
limiter.Wait()
|
||||
|
||||
if successfulPieces < es.RequiredCount() {
|
||||
return nil, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount())
|
||||
return nil, failedNodes, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount())
|
||||
}
|
||||
|
||||
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
return nil, failedNodes, Error.Wrap(err)
|
||||
}
|
||||
|
||||
esScheme := eestream.NewUnsafeRSScheme(fec, es.ErasureShareSize())
|
||||
@ -140,7 +149,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
decodeReader := eestream.DecodeReaders(ctx, cancel, ec.log.Named("decode readers"), pieceReaders, esScheme, expectedSize, 0, false)
|
||||
|
||||
return decodeReader, nil
|
||||
return decodeReader, failedNodes, nil
|
||||
}
|
||||
|
||||
// downloadAndVerifyPiece downloads a piece from a storagenode,
|
||||
@ -188,7 +197,7 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
|
||||
// verify the hashes from storage node
|
||||
calculatedHash := pkcrypto.SHA256Hash(pieceBytes)
|
||||
if err := verifyPieceHash(ctx, originalLimit, hash, calculatedHash); err != nil {
|
||||
return nil, err
|
||||
return nil, ErrPieceHashVerifyFailed.Wrap(err)
|
||||
}
|
||||
|
||||
return pieceBytes, nil
|
||||
|
@ -67,6 +67,7 @@ func NewSegmentRepairer(
|
||||
|
||||
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
|
||||
// note that shouldDelete is used even in the case where err is not null
|
||||
// note that it will update audit status as failed for nodes that failed piece hash verification during repair downloading
|
||||
func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (shouldDelete bool, err error) {
|
||||
defer mon.Task()(&ctx, path)(&err)
|
||||
|
||||
@ -171,7 +172,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
}
|
||||
|
||||
// Download the segment using just the healthy pieces
|
||||
segmentReader, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize())
|
||||
segmentReader, failedNodeIDs, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize())
|
||||
// update audit status for nodes that failed piece hash verification during downloading
|
||||
failedNum, updateErr := repairer.updateAuditFailStatus(ctx, failedNodeIDs)
|
||||
if updateErr != nil || failedNum > 0 {
|
||||
// failed updates should not affect repair, therefore we will not return the error
|
||||
repairer.log.Debug("failed to update audit fail status", zap.Int("Failed Update Number", failedNum), zap.Error(err))
|
||||
}
|
||||
if err != nil {
|
||||
// .Get() seems to only fail from input validation, so it would keep failing
|
||||
return true, Error.Wrap(err)
|
||||
@ -236,6 +243,24 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failedNum int, err error) {
|
||||
updateRequests := make([]*overlay.UpdateRequest, len(failedAuditNodeIDs))
|
||||
for i, nodeID := range failedAuditNodeIDs {
|
||||
updateRequests[i] = &overlay.UpdateRequest{
|
||||
NodeID: nodeID,
|
||||
IsUp: true,
|
||||
AuditSuccess: false,
|
||||
}
|
||||
}
|
||||
if len(updateRequests) > 0 {
|
||||
failed, err := repairer.overlay.BatchUpdateStats(ctx, updateRequests)
|
||||
if err != nil || len(failed) > 0 {
|
||||
return len(failed), errs.Combine(Error.New("failed to update some audit fail statuses in overlay"), err)
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// sliceToSet converts the given slice to a set
|
||||
func sliceToSet(slice []int32) map[int32]bool {
|
||||
set := make(map[int32]bool, len(slice))
|
||||
|
Loading…
Reference in New Issue
Block a user