satellite/repair: fix flakiness in tests

Several tests using `(*ECRepairer).Get()` have begun to exhibit flaky
results. The tests are expecting to see failures in certain cases, but
the failures are not present. It appears that the cause of this is that,
sometimes, the fastest good nodes are able to satisfy the repairer
(providing RequiredCount pieces) before the repairer is able to identify
the problem scenario we have laid out.

In this commit, we add an argument to `(*ECRepairer).Get()` which
specifies how many failure results are expected. In normal/production
conditions, this parameter will be 0, meaning Get need not wait for
any errors and should only report those that arrived while waiting for
RequiredCount pieces (the existing behavior). But in these tests, we can
request that Get() wait for enough results to see the errors we are
expecting.

Refs: https://github.com/storj/storj/issues/5593
Change-Id: I2920edb6b5a344491786aab794d1be6372c07cf8
This commit is contained in:
paul cannon 2023-02-15 15:47:47 -06:00 committed by Storj Robot
parent ab2e793555
commit 6bb6479690
3 changed files with 28 additions and 16 deletions

View File

@ -2480,7 +2480,7 @@ func TestECRepairerGet(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
require.NoError(t, err)
require.Equal(t, 0, len(piecesReport.Offline))
require.Equal(t, 0, len(piecesReport.Failed))
@ -2547,7 +2547,7 @@ func TestECRepairerGetCorrupted(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
require.NoError(t, err)
require.Equal(t, 0, len(piecesReport.Offline))
require.Equal(t, 1, len(piecesReport.Failed))
@ -2616,7 +2616,7 @@ func TestECRepairerGetMissingPiece(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
require.NoError(t, err)
require.Equal(t, 0, len(piecesReport.Offline))
require.Equal(t, 1, len(piecesReport.Failed))
@ -2682,7 +2682,7 @@ func TestECRepairerGetOffline(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
require.NoError(t, err)
require.Equal(t, 1, len(piecesReport.Offline))
require.Equal(t, 0, len(piecesReport.Failed))
@ -2752,7 +2752,7 @@ func TestECRepairerGetUnknown(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1)
require.NoError(t, err)
require.Equal(t, 0, len(piecesReport.Offline))
require.Equal(t, 0, len(piecesReport.Failed))
@ -2837,7 +2837,7 @@ func TestECRepairerGetFailure(t *testing.T) {
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
_, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
require.Error(t, err)
require.Equal(t, 1, len(piecesReport.Offline))
require.Equal(t, 1, len(piecesReport.Failed))
@ -2898,7 +2898,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
require.NoError(t, err)
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
require.NoError(t, err)
require.Len(t, pieces.Failed, 0)
require.NotNil(t, readCloser)
@ -2983,7 +2983,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
require.NoError(t, err)
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize))
readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0)
require.NoError(t, err)
require.Len(t, pieces.Failed, 0)
require.NotNil(t, readCloser)

View File

@ -67,11 +67,14 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
}
// Get downloads pieces from storagenodes using the provided order limits, and decodes those pieces into a segment.
// It attempts to download from the minimum required number based on the redundancy scheme.
// It attempts to download from the minimum required number based on the redundancy scheme. It will further wait
// for additional error/failure results up to minFailures, for testing purposes. Under normal conditions,
// minFailures will be 0.
//
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
// If piece hash verification fails, it will return all failed node IDs.
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error) {
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64, minFailures int) (_ io.ReadCloser, _ FetchResultReport, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
@ -80,12 +83,13 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
nonNilLimits := nonNilCount(limits)
if nonNilLimits < es.RequiredCount() {
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
if nonNilLimits < es.RequiredCount()+minFailures {
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than requested result count (%d)", nonNilCount(limits), es.RequiredCount()+minFailures)
}
pieceSize := eestream.CalcPieceSize(dataSize, es)
errorCount := 0
var successfulPieces, inProgress int
unusedLimits := nonNilLimits
pieceReaders := make(map[int]io.ReadCloser)
@ -106,12 +110,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
defer cond.L.Unlock()
for {
if successfulPieces >= es.RequiredCount() {
// already downloaded minimum number of pieces
if successfulPieces >= es.RequiredCount() && errorCount >= minFailures {
// already downloaded required number of pieces
cond.Broadcast()
return
}
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() {
if successfulPieces+inProgress+unusedLimits < es.RequiredCount() || errorCount+inProgress+unusedLimits < minFailures {
// not enough available limits left to get required number of pieces
cond.Broadcast()
return
@ -162,6 +166,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
zap.Stringer("Piece ID", limit.Limit.PieceId),
zap.String("reason", err.Error()))
pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err})
errorCount++
return
}
@ -173,6 +178,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
zap.Stringer("Piece ID", limit.Limit.PieceId),
zap.Error(err))
pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err})
errorCount++
case audit.PieceAuditOffline:
ec.log.Debug("Failed to download piece for repair: dial timeout (offline)",
@ -180,6 +186,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
zap.Stringer("Piece ID", limit.Limit.PieceId),
zap.Error(err))
pieces.Offline = append(pieces.Offline, PieceFetchResult{Piece: piece, Err: err})
errorCount++
case audit.PieceAuditContained:
ec.log.Info("Failed to download piece for repair: download timeout (contained)",
@ -187,6 +194,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
zap.Stringer("Piece ID", limit.Limit.PieceId),
zap.Error(err))
pieces.Contained = append(pieces.Contained, PieceFetchResult{Piece: piece, Err: err})
errorCount++
case audit.PieceAuditUnknown:
ec.log.Info("Failed to download piece for repair: unknown transport error (skipped)",
@ -194,6 +202,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
zap.Stringer("Piece ID", limit.Limit.PieceId),
zap.Error(err))
pieces.Unknown = append(pieces.Unknown, PieceFetchResult{Piece: piece, Err: err})
errorCount++
}
return
@ -216,6 +225,9 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
piecesRequired: int32(es.RequiredCount()),
}
}
if errorCount < minFailures {
return nil, pieces, Error.New("expected %d failures, but only observed %d", minFailures, errorCount)
}
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
if err != nil {

View File

@ -321,7 +321,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}
// Download the segment using just the healthy pieces
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize))
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0)
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked