satellite/repair: fix flakiness in tests
Several tests using `(*ECRepairer).Get()` have begun to exhibit flaky results. The tests are expecting to see failures in certain cases, but the failures are not present. It appears that the cause of this is that, sometimes, the fastest good nodes are able to satisfy the repairer (providing RequiredCount pieces) before the repairer is able to identify the problem scenario we have laid out. In this commit, we add an argument to `(*ECRepairer).Get()` which specifies how many failure results are expected. In normal/production conditions, this parameter will be 0, meaning Get need not wait for any errors and should only report those that arrived while waiting for RequiredCount pieces (the existing behavior). But in these tests, we can request that Get() wait for enough results to see the errors we are expecting. Refs: https://github.com/storj/storj/issues/5593 Change-Id: I2920edb6b5a344491786aab794d1be6372c07cf8
This commit is contained in:
parent
ab2e793555
commit
6bb6479690
@ -2480,7 +2480,7 @@ func TestECRepairerGet(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2547,7 +2547,7 @@ func TestECRepairerGetCorrupted(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2616,7 +2616,7 @@ func TestECRepairerGetMissingPiece(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2682,7 +2682,7 @@ func TestECRepairerGetOffline(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2752,7 +2752,7 @@ func TestECRepairerGetUnknown(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(piecesReport.Offline))
|
||||
require.Equal(t, 0, len(piecesReport.Failed))
|
||||
@ -2837,7 +2837,7 @@ func TestECRepairerGetFailure(t *testing.T) {
|
||||
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, 1, len(piecesReport.Offline))
|
||||
require.Equal(t, 1, len(piecesReport.Failed))
|
||||
@ -2898,7 +2898,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
||||
require.NoError(t, err)
|
||||
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pieces.Failed, 0)
|
||||
require.NotNil(t, readCloser)
|
||||
@ -2983,7 +2983,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
||||
require.NoError(t, err)
|
||||
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
|
||||
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pieces.Failed, 0)
|
||||
require.NotNil(t, readCloser)
|
||||
|
@ -67,11 +67,14 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
|
||||
}
|
||||
|
||||
// Get downloads pieces from storagenodes using the provided order limits, and decodes those pieces into a segment.
|
||||
// It attempts to download from the minimum required number based on the redundancy scheme.
|
||||
// It attempts to download from the minimum required number based on the redundancy scheme. It will further wait
|
||||
// for additional error/failure results up to minFailures, for testing purposes. Under normal conditions,
|
||||
// minFailures will be 0.
|
||||
//
|
||||
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
|
||||
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
|
||||
// If piece hash verification fails, it will return all failed node IDs.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error) {
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64, minFailures int) (_ io.ReadCloser, _ FetchResultReport, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(limits) != es.TotalCount() {
|
||||
@ -80,12 +83,13 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
|
||||
nonNilLimits := nonNilCount(limits)
|
||||
|
||||
if nonNilLimits < es.RequiredCount() {
|
||||
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
|
||||
if nonNilLimits < es.RequiredCount()+minFailures {
|
||||
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than requested result count (%d)", nonNilCount(limits), es.RequiredCount()+minFailures)
|
||||
}
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(dataSize, es)
|
||||
|
||||
errorCount := 0
|
||||
var successfulPieces, inProgress int
|
||||
unusedLimits := nonNilLimits
|
||||
pieceReaders := make(map[int]io.ReadCloser)
|
||||
@ -106,12 +110,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
defer cond.L.Unlock()
|
||||
|
||||
for {
|
||||
if successfulPieces >= es.RequiredCount() {
|
||||
// already downloaded minimum number of pieces
|
||||
if successfulPieces >= es.RequiredCount() && errorCount >= minFailures {
|
||||
// already downloaded required number of pieces
|
||||
cond.Broadcast()
|
||||
return
|
||||
}
|
||||
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() {
|
||||
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() || errorCount+inProgress+unusedLimits < minFailures {
|
||||
// not enough available limits left to get required number of pieces
|
||||
cond.Broadcast()
|
||||
return
|
||||
@ -162,6 +166,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
zap.Stringer("Piece ID", limit.Limit.PieceId),
|
||||
zap.String("reason", err.Error()))
|
||||
pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err})
|
||||
errorCount++
|
||||
return
|
||||
}
|
||||
|
||||
@ -173,6 +178,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
zap.Stringer("Piece ID", limit.Limit.PieceId),
|
||||
zap.Error(err))
|
||||
pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err})
|
||||
errorCount++
|
||||
|
||||
case audit.PieceAuditOffline:
|
||||
ec.log.Debug("Failed to download piece for repair: dial timeout (offline)",
|
||||
@ -180,6 +186,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
zap.Stringer("Piece ID", limit.Limit.PieceId),
|
||||
zap.Error(err))
|
||||
pieces.Offline = append(pieces.Offline, PieceFetchResult{Piece: piece, Err: err})
|
||||
errorCount++
|
||||
|
||||
case audit.PieceAuditContained:
|
||||
ec.log.Info("Failed to download piece for repair: download timeout (contained)",
|
||||
@ -187,6 +194,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
zap.Stringer("Piece ID", limit.Limit.PieceId),
|
||||
zap.Error(err))
|
||||
pieces.Contained = append(pieces.Contained, PieceFetchResult{Piece: piece, Err: err})
|
||||
errorCount++
|
||||
|
||||
case audit.PieceAuditUnknown:
|
||||
ec.log.Info("Failed to download piece for repair: unknown transport error (skipped)",
|
||||
@ -194,6 +202,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
zap.Stringer("Piece ID", limit.Limit.PieceId),
|
||||
zap.Error(err))
|
||||
pieces.Unknown = append(pieces.Unknown, PieceFetchResult{Piece: piece, Err: err})
|
||||
errorCount++
|
||||
}
|
||||
|
||||
return
|
||||
@ -216,6 +225,9 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
piecesRequired: int32(es.RequiredCount()),
|
||||
}
|
||||
}
|
||||
if errorCount < minFailures {
|
||||
return nil, pieces, Error.New("expected %d failures, but only observed %d", minFailures, errorCount)
|
||||
}
|
||||
|
||||
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
|
||||
if err != nil {
|
||||
|
@ -321,7 +321,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
}
|
||||
|
||||
// Download the segment using just the healthy pieces
|
||||
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize))
|
||||
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
|
||||
|
||||
// ensure we get values, even if only zero values, so that redash can have an alert based on this
|
||||
mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked
|
||||
|
Loading…
Reference in New Issue
Block a user