satellite/repair: delete pieces that failed piece hashes verification from pointer (#3051)
* add test * add implementation * remove todo comments * modifies cooment * fix linting * typo oops
This commit is contained in:
parent
7ada5d4152
commit
b37ea864b1
@ -164,14 +164,14 @@ func TestDataRepair(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestCorruptDataRepair does the following:
|
||||
// TestCorruptDataRepair_Failed does the following:
|
||||
// - Uploads test data
|
||||
// - Kills all but the minimum number of nodes carrying the uploaded segment
|
||||
// - On one of the remaining nodes, corrupt the piece data being stored by that node
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
// - Expects that the repair failed and the pointer was not updated
|
||||
func TestCorruptDataRepair(t *testing.T) {
|
||||
func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
const RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
@ -222,23 +222,23 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
|
||||
var corruptedNode *storagenode.Peer
|
||||
var corruptedNodeID storj.NodeID
|
||||
var corruptedPiece storj.PieceID
|
||||
var corruptedPieceID storj.PieceID
|
||||
|
||||
for i, piece := range remotePieces {
|
||||
originalNodes[piece.NodeId] = true
|
||||
if i >= toKill {
|
||||
// this means the node will be kept alive for repair
|
||||
// choose a node and pieceID to corrupt so repair fails
|
||||
if corruptedNodeID.IsZero() || corruptedPiece.IsZero() {
|
||||
if corruptedNodeID.IsZero() || corruptedPieceID.IsZero() {
|
||||
corruptedNodeID = piece.NodeId
|
||||
corruptedPiece = pointer.GetRemote().RootPieceId.Derive(corruptedNodeID, piece.PieceNum)
|
||||
corruptedPieceID = pointer.GetRemote().RootPieceId.Derive(corruptedNodeID, piece.PieceNum)
|
||||
}
|
||||
continue
|
||||
}
|
||||
nodesToKill[piece.NodeId] = true
|
||||
}
|
||||
require.NotNil(t, corruptedNodeID)
|
||||
require.NotNil(t, corruptedPiece)
|
||||
require.NotNil(t, corruptedPieceID)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == corruptedNodeID {
|
||||
@ -253,43 +253,12 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
}
|
||||
require.NotNil(t, corruptedNode)
|
||||
|
||||
blobRef := storage.BlobRef{
|
||||
Namespace: satellite.ID().Bytes(),
|
||||
Key: corruptedPiece.Bytes(),
|
||||
}
|
||||
|
||||
overlay := planet.Satellites[0].Overlay.Service
|
||||
node, err := overlay.Get(ctx, corruptedNodeID)
|
||||
require.NoError(t, err)
|
||||
corruptedNodeReputation := node.Reputation
|
||||
|
||||
// get currently stored piece data from storagenode
|
||||
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
pieceSize, err := reader.Size()
|
||||
require.NoError(t, err)
|
||||
require.True(t, pieceSize > 0)
|
||||
pieceData := make([]byte, pieceSize)
|
||||
n, err := io.ReadFull(reader, pieceData)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, n, pieceSize)
|
||||
|
||||
// delete piece data
|
||||
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// corrupt piece data (not PieceHeader) and write back to storagenode
|
||||
// this means repair downloading should fail during piece hash verification
|
||||
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
|
||||
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
n, err = writer.Write(pieceData)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, n, pieceSize)
|
||||
|
||||
err = writer.Commit(ctx)
|
||||
require.NoError(t, err)
|
||||
corruptPieceData(ctx, t, planet, corruptedNode, corruptedPieceID)
|
||||
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
@ -318,6 +287,130 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestCorruptDataRepair does the following:
|
||||
// - Uploads test data
|
||||
// - Kills some nodes carrying the uploaded segment but keep it above minimum requirement
|
||||
// - On one of the remaining nodes, corrupt the piece data being stored by that node
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
// - Expects that the repair succeed and the pointer should not contain the corrupted piece
|
||||
func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
const RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 14,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.Node.OnlineWindow = 0
|
||||
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var testData = testrand.Bytes(8 * memory.KiB)
|
||||
// first, upload some remote data
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: 7,
|
||||
MaxThreshold: 9,
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, path := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// calculate how many storagenodes to kill
|
||||
redundancy := pointer.GetRemote().GetRedundancy()
|
||||
remotePieces := pointer.GetRemote().GetRemotePieces()
|
||||
numPieces := len(remotePieces)
|
||||
toKill := numPieces - int(redundancy.RepairThreshold)
|
||||
require.True(t, toKill >= 1)
|
||||
|
||||
// kill nodes and track lost pieces
|
||||
nodesToKill := make(map[storj.NodeID]bool)
|
||||
originalNodes := make(map[storj.NodeID]bool)
|
||||
|
||||
var corruptedNode *storagenode.Peer
|
||||
var corruptedNodeID storj.NodeID
|
||||
var corruptedPieceID storj.PieceID
|
||||
var corruptedPiece *pb.RemotePiece
|
||||
|
||||
for i, piece := range remotePieces {
|
||||
originalNodes[piece.NodeId] = true
|
||||
if i >= toKill {
|
||||
// this means the node will be kept alive for repair
|
||||
// choose a node and pieceID to corrupt so repair fails
|
||||
if corruptedNodeID.IsZero() || corruptedPieceID.IsZero() {
|
||||
corruptedNodeID = piece.NodeId
|
||||
corruptedPieceID = pointer.GetRemote().RootPieceId.Derive(corruptedNodeID, piece.PieceNum)
|
||||
corruptedPiece = piece
|
||||
}
|
||||
continue
|
||||
}
|
||||
nodesToKill[piece.NodeId] = true
|
||||
}
|
||||
require.NotNil(t, corruptedNodeID)
|
||||
require.NotNil(t, corruptedPieceID)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == corruptedNodeID {
|
||||
corruptedNode = node
|
||||
}
|
||||
if nodesToKill[node.ID()] {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
require.NotNil(t, corruptedNode)
|
||||
|
||||
corruptPieceData(ctx, t, planet, corruptedNode, corruptedPieceID)
|
||||
|
||||
overlay := planet.Satellites[0].Overlay.Service
|
||||
node, err := overlay.Get(ctx, corruptedNodeID)
|
||||
require.NoError(t, err)
|
||||
corruptedNodeReputation := node.Reputation
|
||||
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// repair should update audit status as fail
|
||||
node, err = overlay.Get(ctx, corruptedNodeID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, corruptedNodeReputation.AuditCount+1, node.Reputation.AuditCount)
|
||||
require.True(t, corruptedNodeReputation.AuditReputationBeta < node.Reputation.AuditReputationBeta)
|
||||
require.True(t, corruptedNodeReputation.AuditReputationAlpha >= node.Reputation.AuditReputationAlpha)
|
||||
|
||||
// get the new pointer
|
||||
metainfoService := satellite.Metainfo.Service
|
||||
pointer, err = metainfoService.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
remotePieces = pointer.GetRemote().GetRemotePieces()
|
||||
for _, piece := range remotePieces {
|
||||
require.NotEqual(t, piece.PieceNum, corruptedPiece.PieceNum, "there should be no corrupted piece in pointer")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestRemoveIrreparableSegmentFromQueue
|
||||
// - Upload tests data to 7 nodes
|
||||
// - Kill nodes so that repair threshold > online nodes > minimum threshold
|
||||
@ -708,3 +801,41 @@ func stopNodeByID(t *testing.T, ctx context.Context, planet *testplanet.Planet,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// corruptPieceData manipulates piece data on a storage node.
|
||||
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *storagenode.Peer, corruptedPieceID storj.PieceID) {
|
||||
t.Helper()
|
||||
|
||||
blobRef := storage.BlobRef{
|
||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||
Key: corruptedPieceID.Bytes(),
|
||||
}
|
||||
|
||||
// get currently stored piece data from storagenode
|
||||
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
pieceSize, err := reader.Size()
|
||||
require.NoError(t, err)
|
||||
require.True(t, pieceSize > 0)
|
||||
pieceData := make([]byte, pieceSize)
|
||||
n, err := io.ReadFull(reader, pieceData)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, n, pieceSize)
|
||||
|
||||
// delete piece data
|
||||
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// corrupt piece data (not PieceHeader) and write back to storagenode
|
||||
// this means repair downloading should fail during piece hash verification
|
||||
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
|
||||
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
n, err = writer.Write(pieceData)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, n, pieceSize)
|
||||
|
||||
err = writer.Commit(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n *pb.Node) (*piecesto
|
||||
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
|
||||
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
|
||||
// If piece hash verification fails, it will return all failed node IDs.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedNodes storj.NodeIDList, err error) {
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(limits) != es.TotalCount() {
|
||||
@ -115,9 +115,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
cond.L.Lock()
|
||||
inProgress--
|
||||
if err != nil {
|
||||
// gather nodes that failed to match piece hash with the original piece hash
|
||||
// gather nodes where the calculated piece hash doesn't match the uplink signed piece hash
|
||||
if ErrPieceHashVerifyFailed.Has(err) {
|
||||
failedNodes = append(failedNodes, limit.GetLimit().StorageNodeId)
|
||||
failedPieces = append(failedPieces, &pb.RemotePiece{
|
||||
PieceNum: int32(currentLimitIndex),
|
||||
NodeId: limit.GetLimit().StorageNodeId,
|
||||
})
|
||||
} else {
|
||||
ec.log.Debug("Failed to download pieces for repair", zap.Error(err))
|
||||
}
|
||||
@ -135,12 +138,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
limiter.Wait()
|
||||
|
||||
if successfulPieces < es.RequiredCount() {
|
||||
return nil, failedNodes, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount())
|
||||
return nil, failedPieces, Error.New("couldn't download enough pieces, number of successful downloaded pieces (%d) is less than required number (%d)", successfulPieces, es.RequiredCount())
|
||||
}
|
||||
|
||||
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
|
||||
if err != nil {
|
||||
return nil, failedNodes, Error.Wrap(err)
|
||||
return nil, failedPieces, Error.Wrap(err)
|
||||
}
|
||||
|
||||
esScheme := eestream.NewUnsafeRSScheme(fec, es.ErasureShareSize())
|
||||
@ -149,7 +152,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
decodeReader := eestream.DecodeReaders(ctx, cancel, ec.log.Named("decode readers"), pieceReaders, esScheme, expectedSize, 0, false)
|
||||
|
||||
return decodeReader, failedNodes, nil
|
||||
return decodeReader, failedPieces, nil
|
||||
}
|
||||
|
||||
// downloadAndVerifyPiece downloads a piece from a storagenode,
|
||||
|
@ -172,7 +172,14 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
}
|
||||
|
||||
// Download the segment using just the healthy pieces
|
||||
segmentReader, failedNodeIDs, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize())
|
||||
segmentReader, failedPieces, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, pointer.GetSegmentSize())
|
||||
|
||||
// Populate node IDs that failed piece hashes verification
|
||||
var failedNodeIDs storj.NodeIDList
|
||||
for _, piece := range failedPieces {
|
||||
failedNodeIDs = append(failedNodeIDs, piece.NodeId)
|
||||
}
|
||||
|
||||
// update audit status for nodes that failed piece hash verification during downloading
|
||||
failedNum, updateErr := repairer.updateAuditFailStatus(ctx, failedNodeIDs)
|
||||
if updateErr != nil || failedNum > 0 {
|
||||
@ -238,6 +245,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
}
|
||||
}
|
||||
|
||||
// add pieces that failed piece hashes verification to the removal list
|
||||
toRemove = append(toRemove, failedPieces...)
|
||||
|
||||
// Update the segment pointer in the metainfo
|
||||
_, err = repairer.metainfo.UpdatePieces(ctx, path, pointer, repairedPieces, toRemove)
|
||||
return err == nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user