diff --git a/satellite/audit/getshare_test.go b/satellite/audit/getshare_test.go index a7e0cedf3..3ce3a1e0c 100644 --- a/satellite/audit/getshare_test.go +++ b/satellite/audit/getshare_test.go @@ -118,9 +118,9 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) { mock := &mockConnector{} verifier := reformVerifierWithMockConnector(t, testSatellite, mock) - share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) - require.NoError(t, err) + share := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) require.NoError(t, share.Error) + require.Equal(t, audit.NoFailure, share.FailurePhase) // we expect that the cached IP and port was actually dialed require.Contains(t, mock.addressesDialed, cachedIPAndPort) @@ -183,9 +183,9 @@ func TestGetSharePrefers(t *testing.T) { } verifier := reformVerifierWithMockConnector(t, testSatellite, mock) - share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) - require.NoError(t, err) + share := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum) require.NoError(t, share.Error) + require.Equal(t, audit.NoFailure, share.FailurePhase) // we expect that the cached IP and port was actually dialed require.Contains(t, mock.addressesDialed, cachedIPAndPort) diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index 218100b31..457e6d7ae 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -43,12 +43,25 @@ var ( ErrSegmentModified = errs.Class("segment has been modified") ) +// FailurePhase indicates during which phase a GET_SHARE operation failed. +type FailurePhase int + +const ( + // NoFailure indicates there was no failure during a GET_SHARE operation. + NoFailure FailurePhase = 0 + // DialFailure indicates a GET_SHARE operation failed during Dial. + DialFailure FailurePhase = 1 + // RequestFailure indicates a GET_SHARE operation failed to make its RPC request, or the request failed. + RequestFailure FailurePhase = 2 +) + // Share represents required information about an audited share. type Share struct { - Error error - PieceNum int - NodeID storj.NodeID - Data []byte + Error error + FailurePhase FailurePhase + PieceNum int + NodeID storj.NodeID + Data []byte } // Verifier helps verify the correctness of a given stripe. @@ -175,72 +188,41 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ continue } - // TODO: just because an error came from the common/rpc package - // does not decisively mean that the problem is something to do - // with dialing. instead of trying to guess what different - // error classes mean, we should make GetShare inside - // DownloadShares return more direct information about when - // the failure happened. - if rpc.Error.Has(share.Error) { - if errors.Is(share.Error, context.DeadlineExceeded) || errs.Is(share.Error, context.DeadlineExceeded) { - // dial timeout - offlineNodes = append(offlineNodes, share.NodeID) - verifier.log.Debug("Verify: dial timeout (offline)", - zap.Stringer("Node ID", share.NodeID), - zap.String("Segment", segmentInfoString(segment)), - zap.Error(share.Error)) - continue - } - if errs2.IsRPC(share.Error, rpcstatus.Unknown) { - // dial failed -- offline node - // TODO: we should never assume what an unknown - // error means. This should be looking for an explicit - // indication that dialing failed, not assuming dialing - // failed because the rpc status is unknown - offlineNodes = append(offlineNodes, share.NodeID) - verifier.log.Debug("Verify: dial failed (offline)", - zap.Stringer("Node ID", share.NodeID), - zap.String("Segment", segmentInfoString(segment)), - zap.Error(share.Error)) - continue - } - // unknown transport error - unknownNodes = append(unknownNodes, share.NodeID) - verifier.log.Info("Verify: unknown transport error (skipped)", - zap.Stringer("Node ID", share.NodeID), - zap.String("Segment", segmentInfoString(segment)), - zap.Error(share.Error), - zap.String("ErrorType", spew.Sprintf("%#+v", share.Error))) - continue - } - - if errs2.IsRPC(share.Error, rpcstatus.NotFound) { - // missing share - failedNodes = append(failedNodes, share.NodeID) - verifier.log.Info("Verify: piece not found (audit failed)", - zap.Stringer("Node ID", share.NodeID), - zap.String("Segment", segmentInfoString(segment)), - zap.Error(share.Error)) - continue - } - - if errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded) { - // dial successful, but download timed out - containedNodes[pieceNum] = share.NodeID - verifier.log.Info("Verify: download timeout (contained)", - zap.Stringer("Node ID", share.NodeID), - zap.String("Segment", segmentInfoString(segment)), - zap.Error(share.Error)) - continue - } - - // unknown error - unknownNodes = append(unknownNodes, share.NodeID) - verifier.log.Info("Verify: unknown error (skipped)", + pieceID := orderLimits[pieceNum].Limit.PieceId + errLogger := verifier.log.With( zap.Stringer("Node ID", share.NodeID), zap.String("Segment", segmentInfoString(segment)), + zap.Stringer("Piece ID", pieceID), zap.Error(share.Error), - zap.String("ErrorType", spew.Sprintf("%#+v", share.Error))) + ) + + switch share.FailurePhase { + case DialFailure: + // dial failed -- offline node + offlineNodes = append(offlineNodes, share.NodeID) + errLogger.Debug("Verify: dial failed (offline)") + continue + + case RequestFailure: + if errs2.IsRPC(share.Error, rpcstatus.NotFound) { + // missing share + failedNodes = append(failedNodes, share.NodeID) + errLogger.Info("Verify: piece not found (audit failed)") + continue + } + + if errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded) { + // dial successful, but download timed out + containedNodes[pieceNum] = share.NodeID + errLogger.Info("Verify: download timeout (contained)") + continue + } + + // unknown error + unknownNodes = append(unknownNodes, share.NodeID) + errLogger.Info("Verify: unknown error (skipped)", + zap.String("ErrorType", spew.Sprintf("%#+v", share.Error))) + } } mon.IntVal("verify_shares_downloaded_successfully").Observe(int64(len(sharesToAudit))) //mon:locked @@ -337,15 +319,7 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre } go func(i int, limit *pb.AddressedOrderLimit) { - share, err := verifier.GetShare(ctx, limit, piecePrivateKey, ipPort, stripeIndex, shareSize, i) - if err != nil { - share = Share{ - Error: err, - PieceNum: i, - NodeID: limit.GetLimit().StorageNodeId, - Data: nil, - } - } + share := verifier.GetShare(ctx, limit, piecePrivateKey, ipPort, stripeIndex, shareSize, i) ch <- &share }(i, limit) } @@ -387,8 +361,12 @@ func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Se } // GetShare use piece store client to download shares from nodes. -func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) { - defer mon.Task()(&ctx)(&err) +func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share) { + defer mon.Task()(&ctx)(&share.Error) + + share.PieceNum = pieceNum + share.NodeID = limit.GetLimit().StorageNodeId + share.FailurePhase = DialFailure bandwidthMsgSize := shareSize @@ -407,6 +385,7 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder targetNodeID := limit.GetLimit().StorageNodeId log := verifier.log.Named(targetNodeID.String()) var ps *piecestore.Client + var err error // if cached IP is given, try connecting there first if cachedIPAndPort != "" { @@ -428,10 +407,12 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder } ps, err = piecestore.Dial(rpcpool.WithForceDial(timedCtx), verifier.dialer, nodeAddr, piecestore.DefaultConfig) if err != nil { - return Share{}, Error.Wrap(err) + share.Error = Error.Wrap(err) + return share } } + share.FailurePhase = RequestFailure defer func() { err := ps.Close() if err != nil { @@ -443,22 +424,30 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder downloader, err := ps.Download(timedCtx, limit.GetLimit(), piecePrivateKey, offset, int64(shareSize)) if err != nil { - return Share{}, err + share.Error = err + return share } - defer func() { err = errs.Combine(err, downloader.Close()) }() buf := make([]byte, shareSize) _, err = io.ReadFull(downloader, buf) - if err != nil { - return Share{}, err + closeErr := downloader.Close() + if err != nil || closeErr != nil { + if errors.Is(err, io.ErrClosedPipe) { + // in some circumstances, this can be returned from the piecestore + // when the peer returned a different error. The downloader gets + // marked as being closed, even though we haven't closed it from + // this side, and ErrClosedPipe gets returned on the next Read + // instead of the actual error. We'll get the real error from + // downloader.Close(). + err = nil + } + share.Error = errs.Combine(err, closeErr) + return share } + share.Data = buf + share.FailurePhase = NoFailure - return Share{ - Error: nil, - PieceNum: pieceNum, - NodeID: targetNodeID, - Data: buf, - }, nil + return share } // checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index a95db0cdc..9c56683ce 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -7,7 +7,9 @@ import ( "context" "crypto/rand" "fmt" + "net" "os" + "syscall" "testing" "time" @@ -80,6 +82,7 @@ func TestDownloadSharesHappyPath(t *testing.T) { for _, share := range shares { assert.NoError(t, share.Error) + assert.Equal(t, audit.NoFailure, share.FailurePhase) } }) } @@ -143,8 +146,10 @@ func TestDownloadSharesOfflineNode(t *testing.T) { assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error) assert.False(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error) assert.True(t, errs2.IsRPC(share.Error, rpcstatus.Unknown), "unexpected error: %+v", share.Error) + assert.Equal(t, audit.DialFailure, share.FailurePhase) } else { assert.NoError(t, share.Error) + assert.Equal(t, audit.NoFailure, share.FailurePhase) } } }) @@ -202,6 +207,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) { for _, share := range shares { assert.True(t, errs2.IsRPC(share.Error, rpcstatus.NotFound), "unexpected error: %+v", share.Error) + assert.Equal(t, audit.RequestFailure, share.FailurePhase) } }) } @@ -284,10 +290,121 @@ func TestDownloadSharesDialTimeout(t *testing.T) { for _, share := range shares { assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error) assert.True(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error) + assert.Equal(t, audit.DialFailure, share.FailurePhase) } }) } +// TestDownloadSharesDialIOTimeout checks that i/o timeout dial failures are +// handled appropriately. +// +// This test differs from TestDownloadSharesDialTimeout in that it causes the +// timeout error by replacing a storage node with a black hole TCP socket, +// causing the failure directly instead of faking it with dialer.DialLatency. +func TestDownloadSharesDialIOTimeout(t *testing.T) { + var group errgroup.Group + // we do this shutdown outside the testplanet scope, so that we can expect + // that planet has been shut down before waiting for the black hole goroutines + // to finish. (They won't finish until the remote end is closed, which happens + // during planet shutdown.) + defer func() { assert.NoError(t, group.Wait()) }() + + testWithRangedLoop(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + // require all nodes for each operation + Satellite: testplanet.ReconfigureRS(4, 4, 4, 4), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) { + satellite := planet.Satellites[0] + audits := satellite.Audit + + audits.Worker.Loop.Pause() + pauseQueueing(satellite) + + upl := planet.Uplinks[0] + testData := testrand.Bytes(8 * memory.KiB) + + err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + err = runQueueingOnce(ctx, satellite) + require.NoError(t, err) + + queue := audits.VerifyQueue + queueSegment, err := queue.Next(ctx) + require.NoError(t, err) + + segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + }) + require.NoError(t, err) + + blackHoleNode := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes))] + require.NoError(t, planet.StopPeer(blackHoleNode)) + + // create a black hole in place of the storage node: a socket that only reads + // bytes and never says anything back. A connection to here using a bare TCP Dial + // would succeed, but a TLS Dial will not be able to handshake and will time out + // or wait forever. + listener, err := net.Listen("tcp", blackHoleNode.Addr()) + require.NoError(t, err) + defer func() { assert.NoError(t, listener.Close()) }() + t.Logf("black hole listening on %s", listener.Addr()) + + group.Go(func() error { + for { + conn, err := listener.Accept() + if err != nil { + // this is terrible, but is apparently the standard and correct way to check + // for this specific error. See parseCloseError() in net/error_test.go in the + // Go stdlib. + assert.ErrorContains(t, err, "use of closed network connection") + return nil + } + t.Logf("connection made to black hole port %s", listener.Addr()) + group.Go(func() (err error) { + defer func() { assert.NoError(t, conn.Close()) }() + + // black hole: just read until the socket is closed on the other end + buf := make([]byte, 1024) + for { + _, err = conn.Read(buf) + if err != nil { + assert.ErrorIs(t, err, syscall.ECONNRESET) + return nil + } + } + }) + } + }) + + randomIndex, err := audit.GetRandomStripe(ctx, segment) + require.NoError(t, err) + shareSize := segment.Redundancy.ShareSize + + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + require.NoError(t, err) + + verifier := satellite.Audit.Verifier + shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) + require.NoError(t, err) + + observed := false + for _, share := range shares { + if share.NodeID.Compare(blackHoleNode.ID()) == 0 { + assert.ErrorIs(t, share.Error, context.DeadlineExceeded) + assert.Equal(t, audit.DialFailure, share.FailurePhase) + observed = true + } else { + assert.NoError(t, share.Error) + } + } + assert.Truef(t, observed, "No node in returned shares matched expected node ID") + }) +} + // TestDownloadSharesDownloadTimeout checks that the Share.Error field of the // shares returned by the DownloadShares method for nodes that are successfully // dialed, but time out during the download of the share contain an error that: @@ -365,6 +482,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { require.Len(t, shares, 1) share := shares[0] assert.True(t, errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded), "unexpected error: %+v", share.Error) + assert.Equal(t, audit.RequestFailure, share.FailurePhase) assert.False(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error) }) }