From 6bb6479690f962c2094ff34ced06a12ac0b22cb5 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Wed, 15 Feb 2023 15:47:47 -0600 Subject: [PATCH] satellite/repair: fix flakiness in tests Several tests using `(*ECRepairer).Get()` have begun to exhibit flaky results. The tests are expecting to see failures in certain cases, but the failures are not present. It appears that the cause of this is that, sometimes, the fastest good nodes are able to satisfy the repairer (providing RequiredCount pieces) before the repairer is able to identify the problem scenario we have laid out. In this commit, we add an argument to `(*ECRepairer).Get()` which specifies how many failure results are expected. In normal/production conditions, this parameter will be 0, meaning Get need not wait for any errors and should only report those that arrived while waiting for RequiredCount pieces (the existing behavior). But in these tests, we can request that Get() wait for enough results to see the errors we are expecting. Refs: https://github.com/storj/storj/issues/5593 Change-Id: I2920edb6b5a344491786aab794d1be6372c07cf8 --- satellite/repair/repair_test.go | 16 ++++++++-------- satellite/repair/repairer/ec.go | 26 +++++++++++++++++++------- satellite/repair/repairer/segments.go | 2 +- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 5d63d10f8..08c3b4438 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -2480,7 +2480,7 @@ func TestECRepairerGet(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0) require.NoError(t, err) require.Equal(t, 0, len(piecesReport.Offline)) require.Equal(t, 0, len(piecesReport.Failed)) @@ -2547,7 +2547,7 @@ func TestECRepairerGetCorrupted(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1) require.NoError(t, err) require.Equal(t, 0, len(piecesReport.Offline)) require.Equal(t, 1, len(piecesReport.Failed)) @@ -2616,7 +2616,7 @@ func TestECRepairerGetMissingPiece(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1) require.NoError(t, err) require.Equal(t, 0, len(piecesReport.Offline)) require.Equal(t, 1, len(piecesReport.Failed)) @@ -2682,7 +2682,7 @@ func TestECRepairerGetOffline(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1) require.NoError(t, err) require.Equal(t, 1, len(piecesReport.Offline)) require.Equal(t, 0, len(piecesReport.Failed)) @@ -2752,7 +2752,7 @@ func TestECRepairerGetUnknown(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 1) require.NoError(t, err) require.Equal(t, 0, len(piecesReport.Offline)) require.Equal(t, 0, len(piecesReport.Failed)) @@ -2837,7 +2837,7 @@ func TestECRepairerGetFailure(t *testing.T) { getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + _, piecesReport, err := ecRepairer.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0) require.Error(t, err) require.Equal(t, 1, len(piecesReport.Offline)) require.Equal(t, 1, len(piecesReport.Failed)) @@ -2898,7 +2898,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) require.NoError(t, err) - readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize)) + readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0) require.NoError(t, err) require.Len(t, pieces.Failed, 0) require.NotNil(t, readCloser) @@ -2983,7 +2983,7 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) require.NoError(t, err) - readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize)) + readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize), 0) require.NoError(t, err) require.Len(t, pieces.Failed, 0) require.NotNil(t, readCloser) diff --git a/satellite/repair/repairer/ec.go b/satellite/repair/repairer/ec.go index 84c285b3f..50c978d9b 100644 --- a/satellite/repair/repairer/ec.go +++ b/satellite/repair/repairer/ec.go @@ -67,11 +67,14 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie } // Get downloads pieces from storagenodes using the provided order limits, and decodes those pieces into a segment. -// It attempts to download from the minimum required number based on the redundancy scheme. +// It attempts to download from the minimum required number based on the redundancy scheme. It will further wait +// for additional error/failure results up to minFailures, for testing purposes. Under normal conditions, +// minFailures will be 0. +// // After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece. // If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits. // If piece hash verification fails, it will return all failed node IDs. -func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error) { +func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64, minFailures int) (_ io.ReadCloser, _ FetchResultReport, err error) { defer mon.Task()(&ctx)(&err) if len(limits) != es.TotalCount() { @@ -80,12 +83,13 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, nonNilLimits := nonNilCount(limits) - if nonNilLimits < es.RequiredCount() { - return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount()) + if nonNilLimits < es.RequiredCount()+minFailures { + return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than requested result count (%d)", nonNilCount(limits), es.RequiredCount()+minFailures) } pieceSize := eestream.CalcPieceSize(dataSize, es) + errorCount := 0 var successfulPieces, inProgress int unusedLimits := nonNilLimits pieceReaders := make(map[int]io.ReadCloser) @@ -106,12 +110,12 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, defer cond.L.Unlock() for { - if successfulPieces >= es.RequiredCount() { - // already downloaded minimum number of pieces + if successfulPieces >= es.RequiredCount() && errorCount >= minFailures { + // already downloaded required number of pieces cond.Broadcast() return } - if successfulPieces+inProgress+unusedLimits < es.RequiredCount() { + if successfulPieces+inProgress+unusedLimits < es.RequiredCount() || errorCount+inProgress+unusedLimits < minFailures { // not enough available limits left to get required number of pieces cond.Broadcast() return @@ -162,6 +166,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, zap.Stringer("Piece ID", limit.Limit.PieceId), zap.String("reason", err.Error())) pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err}) + errorCount++ return } @@ -173,6 +178,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, zap.Stringer("Piece ID", limit.Limit.PieceId), zap.Error(err)) pieces.Failed = append(pieces.Failed, PieceFetchResult{Piece: piece, Err: err}) + errorCount++ case audit.PieceAuditOffline: ec.log.Debug("Failed to download piece for repair: dial timeout (offline)", @@ -180,6 +186,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, zap.Stringer("Piece ID", limit.Limit.PieceId), zap.Error(err)) pieces.Offline = append(pieces.Offline, PieceFetchResult{Piece: piece, Err: err}) + errorCount++ case audit.PieceAuditContained: ec.log.Info("Failed to download piece for repair: download timeout (contained)", @@ -187,6 +194,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, zap.Stringer("Piece ID", limit.Limit.PieceId), zap.Error(err)) pieces.Contained = append(pieces.Contained, PieceFetchResult{Piece: piece, Err: err}) + errorCount++ case audit.PieceAuditUnknown: ec.log.Info("Failed to download piece for repair: unknown transport error (skipped)", @@ -194,6 +202,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, zap.Stringer("Piece ID", limit.Limit.PieceId), zap.Error(err)) pieces.Unknown = append(pieces.Unknown, PieceFetchResult{Piece: piece, Err: err}) + errorCount++ } return @@ -216,6 +225,9 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, piecesRequired: int32(es.RequiredCount()), } } + if errorCount < minFailures { + return nil, pieces, Error.New("expected %d failures, but only observed %d", minFailures, errorCount) + } fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount()) if err != nil { diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 456a42f3d..23a196aec 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -321,7 +321,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Download the segment using just the healthy pieces - segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize), 0) // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked