satellite/repair: fix flaky TestFailedDataRepair and friends
The following tests should be made less flaky by this change: - TestFailedDataRepair - TestOfflineNodeDataRepair - TestUnknownErrorDataRepair - TestMissingPieceDataRepair_Succeed - TestMissingPieceDataRepair - TestCorruptDataRepair_Succeed - TestCorruptDataRepair_Failed This follows on to a change in commit6bb64796
. Nearly all tests in the repair suite used to rely on events happening in a certain order. After some of our performance work, those things no longer happen in that expected order every time. This caused much flakiness. The fix in6bb64796
was sufficient for the tests operating directly on an `*ECRepairer` instance, but not for the tests that make use of the repairer by way of the repair queue and the repair worker. These tests needed a different way to indicate the number of expected failures. This change provides that different way. Refs: https://github.com/storj/storj/issues/5736 Refs: https://github.com/storj/storj/issues/5718 Refs: https://github.com/storj/storj/issues/5715 Refs: https://github.com/storj/storj/issues/5609 Change-Id: Iddcf5be3a3ace7ad35fddb513ab53dd3f2f0eb0e
This commit is contained in:
parent
bafd3826f0
commit
9e6955cc17
@ -549,6 +549,7 @@ func TestFailedDataRepair(t *testing.T) {
|
||||
nodesReputation[piece.StorageNode] = *info
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(2) // expecting one erroring node, one offline node
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -663,6 +664,7 @@ func TestOfflineNodeDataRepair(t *testing.T) {
|
||||
nodesReputation[piece.StorageNode] = *info
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one offline node
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -787,6 +789,7 @@ func TestUnknownErrorDataRepair(t *testing.T) {
|
||||
nodesReputation[piece.StorageNode] = *info
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expecting one bad node
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -907,6 +910,7 @@ func TestMissingPieceDataRepair_Succeed(t *testing.T) {
|
||||
nodesReputation[piece.StorageNode] = *info
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one node to have a missing piece
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -1029,6 +1033,7 @@ func TestMissingPieceDataRepair(t *testing.T) {
|
||||
successful = pieces.Successful
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one missing piece
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -1140,6 +1145,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
nodesReputation[piece.StorageNode] = *info
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one node with bad data
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -1261,6 +1267,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
successful = report.Successful
|
||||
}
|
||||
|
||||
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one corrupted piece
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
@ -2480,7 +2487,7 @@ func TestECRepairerGet(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2547,7 +2554,8 @@ func TestECRepairerGetCorrupted(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
ecRepairer.TestingSetMinFailures(1)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2616,7 +2624,8 @@ func TestECRepairerGetMissingPiece(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
ecRepairer.TestingSetMinFailures(1)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2682,7 +2691,8 @@ func TestECRepairerGetOffline(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
ecRepairer.TestingSetMinFailures(1)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2752,7 +2762,8 @@ func TestECRepairerGetUnknown(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
ecRepairer.TestingSetMinFailures(1)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2837,7 +2848,7 @@ func TestECRepairerGetFailure(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.Error(t, err)
|
||||
require.Equal(t, 1, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2898,7 +2909,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
||||
require.NoError(t, err)
|
||||
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pieces.Failed, 0)
|
||||
require.NotNil(t, readCloser)
|
||||
@ -2983,7 +2994,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
||||
require.NoError(t, err)
|
||||
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pieces.Failed, 0)
|
||||
require.NotNil(t, readCloser)
|
||||
|
@ -48,6 +48,9 @@ type ECRepairer struct {
|
||||
satelliteSignee signing.Signee
|
||||
downloadTimeout time.Duration
|
||||
inmemory bool
|
||||
|
||||
// used only in tests, where we expect failures and want to wait for them
|
||||
minFailures int
|
||||
}
|
||||
|
||||
// NewECRepairer creates a new repairer for interfacing with storagenodes.
|
||||
@ -66,6 +69,12 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
|
||||
return client, ErrDialFailed.Wrap(err)
|
||||
}
|
||||
|
||||
// TestingSetMinFailures sets the minFailures attribute, which tells the Repair machinery that we _expect_
|
||||
// there to be failures and that we should wait for them if necessary. This is only used in tests.
|
||||
func (ec *ECRepairer) TestingSetMinFailures(minFailures int) {
|
||||
ec.minFailures = minFailures
|
||||
}
|
||||
|
||||
// Get downloads pieces from storagenodes using the provided order limits, and decodes those pieces into a segment.
|
||||
// It attempts to download from the minimum required number based on the redundancy scheme. It will further wait
|
||||
// for additional error/failure results up to minFailures, for testing purposes. Under normal conditions,
|
||||
@ -74,7 +83,7 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
|
||||
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
|
||||
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
|
||||
// If piece hash verification fails, it will return all failed node IDs.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64, minFailures int) (_ io.ReadCloser, _ FetchResultReport, err error) {
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(limits) != es.TotalCount() {
|
||||
@ -83,8 +92,8 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
|
||||
nonNilLimits := nonNilCount(limits)
|
||||
|
||||
if nonNilLimits < es.RequiredCount()+minFailures {
|
||||
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than requested result count (%d)", nonNilCount(limits), es.RequiredCount()+minFailures)
|
||||
if nonNilLimits < es.RequiredCount()+ec.minFailures {
|
||||
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than requested result count (%d)", nonNilCount(limits), es.RequiredCount()+ec.minFailures)
|
||||
}
|
||||
|
||||
mon.IntVal("ECRepairer_Get_nonNilLimits").Observe(int64(nonNilLimits))
|
||||
@ -112,20 +121,20 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
defer cond.L.Unlock()
|
||||
|
||||
for {
|
||||
if successfulPieces >= es.RequiredCount() && errorCount >= minFailures {
|
||||
if successfulPieces >= es.RequiredCount() && errorCount >= ec.minFailures {
|
||||
// already downloaded required number of pieces
|
||||
cond.Broadcast()
|
||||
return
|
||||
}
|
||||
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() || errorCount+inProgress+unusedLimits < minFailures {
|
||||
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() || errorCount+inProgress+unusedLimits < ec.minFailures {
|
||||
// not enough available limits left to get required number of pieces
|
||||
cond.Broadcast()
|
||||
return
|
||||
}
|
||||
|
||||
if successfulPieces+inProgress >= es.RequiredCount() && errorCount+inProgress >= minFailures {
|
||||
if successfulPieces+inProgress >= es.RequiredCount() && errorCount+inProgress >= ec.minFailures {
|
||||
// we know that inProgress > 0 here, since we didn't return on the
|
||||
// "successfulPieces >= es.RequiredCount() && errorCount >= minFailures" check earlier.
|
||||
// "successfulPieces >= es.RequiredCount() && errorCount >= ec.minFailures" check earlier.
|
||||
// There may be enough downloads in progress to meet all of our needs, so we won't
|
||||
// start any more immediately. Instead, wait until all needs are met (in which case
|
||||
// cond.Broadcast() will be called) or until one of the inProgress workers exits
|
||||
@ -234,8 +243,8 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
piecesRequired: int32(es.RequiredCount()),
|
||||
}
|
||||
}
|
||||
if errorCount < minFailures {
|
||||
return nil, pieces, Error.New("expected %d failures, but only observed %d", minFailures, errorCount)
|
||||
if errorCount < ec.minFailures {
|
||||
return nil, pieces, Error.New("expected %d failures, but only observed %d", ec.minFailures, errorCount)
|
||||
}
|
||||
|
||||
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
|
||||
|
@ -83,6 +83,12 @@ func (service *Service) WaitForPendingRepairs() {
|
||||
service.JobLimiter.Release(int64(service.config.MaxRepair))
|
||||
}
|
||||
|
||||
// TestingSetMinFailures sets the minFailures attribute, which tells the Repair machinery that we _expect_
|
||||
// there to be failures and that we should wait for them if necessary. This is only used in tests.
|
||||
func (service *Service) TestingSetMinFailures(minFailures int) {
|
||||
service.repairer.ec.TestingSetMinFailures(minFailures)
|
||||
}
|
||||
|
||||
// Run runs the repairer service.
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -321,7 +321,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
}
|
||||
|
||||
// Download the segment using just the healthy pieces
|
||||
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
|
||||
// ensure we get values, even if only zero values, so that redash can have an alert based on this
|
||||
mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked
|
||||
|
Loading…
Reference in New Issue
Block a user