satellite/audit: improve error handling

* Don't use rpcstatus.Unknown as an indicator of dial failure; instead,
  GetShare now indicates with a per-share field where a failure happened
  (DialFailure, RequestFailure, NoFailure). Use that information in
  Verify() to determine how to treat the source node.

* Add a test that replaces a storage node with a black hole, so that
  connections there will always time out. Make sure we handle that case
  correctly.

Refs: https://github.com/storj/storj/issues/5632
Change-Id: I513a53520fb48b7187d4c4d7e14e01e2cfc0a721
This commit is contained in:
paul cannon 2023-04-13 11:31:27 -05:00 committed by Storj Robot
parent bc30deee11
commit c2710cc78d
3 changed files with 200 additions and 93 deletions

View File

@ -118,9 +118,9 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
mock := &mockConnector{}
verifier := reformVerifierWithMockConnector(t, testSatellite, mock)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, err)
share := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, share.Error)
require.Equal(t, audit.NoFailure, share.FailurePhase)
// we expect that the cached IP and port was actually dialed
require.Contains(t, mock.addressesDialed, cachedIPAndPort)
@ -183,9 +183,9 @@ func TestGetSharePrefers(t *testing.T) {
}
verifier := reformVerifierWithMockConnector(t, testSatellite, mock)
share, err := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, err)
share := verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, 0, segment.Redundancy.ShareSize, orderNum)
require.NoError(t, share.Error)
require.Equal(t, audit.NoFailure, share.FailurePhase)
// we expect that the cached IP and port was actually dialed
require.Contains(t, mock.addressesDialed, cachedIPAndPort)

View File

@ -43,12 +43,25 @@ var (
ErrSegmentModified = errs.Class("segment has been modified")
)
// FailurePhase indicates during which phase a GET_SHARE operation failed.
type FailurePhase int
const (
// NoFailure indicates there was no failure during a GET_SHARE operation.
NoFailure FailurePhase = 0
// DialFailure indicates a GET_SHARE operation failed during Dial.
DialFailure FailurePhase = 1
// RequestFailure indicates a GET_SHARE operation failed to make its RPC request, or the request failed.
RequestFailure FailurePhase = 2
)
// Share represents required information about an audited share.
type Share struct {
Error error
PieceNum int
NodeID storj.NodeID
Data []byte
Error error
FailurePhase FailurePhase
PieceNum int
NodeID storj.NodeID
Data []byte
}
// Verifier helps verify the correctness of a given stripe.
@ -175,72 +188,41 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
continue
}
// TODO: just because an error came from the common/rpc package
// does not decisively mean that the problem is something to do
// with dialing. instead of trying to guess what different
// error classes mean, we should make GetShare inside
// DownloadShares return more direct information about when
// the failure happened.
if rpc.Error.Has(share.Error) {
if errors.Is(share.Error, context.DeadlineExceeded) || errs.Is(share.Error, context.DeadlineExceeded) {
// dial timeout
offlineNodes = append(offlineNodes, share.NodeID)
verifier.log.Debug("Verify: dial timeout (offline)",
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Error(share.Error))
continue
}
if errs2.IsRPC(share.Error, rpcstatus.Unknown) {
// dial failed -- offline node
// TODO: we should never assume what an unknown
// error means. This should be looking for an explicit
// indication that dialing failed, not assuming dialing
// failed because the rpc status is unknown
offlineNodes = append(offlineNodes, share.NodeID)
verifier.log.Debug("Verify: dial failed (offline)",
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Error(share.Error))
continue
}
// unknown transport error
unknownNodes = append(unknownNodes, share.NodeID)
verifier.log.Info("Verify: unknown transport error (skipped)",
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Error(share.Error),
zap.String("ErrorType", spew.Sprintf("%#+v", share.Error)))
continue
}
if errs2.IsRPC(share.Error, rpcstatus.NotFound) {
// missing share
failedNodes = append(failedNodes, share.NodeID)
verifier.log.Info("Verify: piece not found (audit failed)",
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Error(share.Error))
continue
}
if errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded) {
// dial successful, but download timed out
containedNodes[pieceNum] = share.NodeID
verifier.log.Info("Verify: download timeout (contained)",
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Error(share.Error))
continue
}
// unknown error
unknownNodes = append(unknownNodes, share.NodeID)
verifier.log.Info("Verify: unknown error (skipped)",
pieceID := orderLimits[pieceNum].Limit.PieceId
errLogger := verifier.log.With(
zap.Stringer("Node ID", share.NodeID),
zap.String("Segment", segmentInfoString(segment)),
zap.Stringer("Piece ID", pieceID),
zap.Error(share.Error),
zap.String("ErrorType", spew.Sprintf("%#+v", share.Error)))
)
switch share.FailurePhase {
case DialFailure:
// dial failed -- offline node
offlineNodes = append(offlineNodes, share.NodeID)
errLogger.Debug("Verify: dial failed (offline)")
continue
case RequestFailure:
if errs2.IsRPC(share.Error, rpcstatus.NotFound) {
// missing share
failedNodes = append(failedNodes, share.NodeID)
errLogger.Info("Verify: piece not found (audit failed)")
continue
}
if errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded) {
// dial successful, but download timed out
containedNodes[pieceNum] = share.NodeID
errLogger.Info("Verify: download timeout (contained)")
continue
}
// unknown error
unknownNodes = append(unknownNodes, share.NodeID)
errLogger.Info("Verify: unknown error (skipped)",
zap.String("ErrorType", spew.Sprintf("%#+v", share.Error)))
}
}
mon.IntVal("verify_shares_downloaded_successfully").Observe(int64(len(sharesToAudit))) //mon:locked
@ -337,15 +319,7 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre
}
go func(i int, limit *pb.AddressedOrderLimit) {
share, err := verifier.GetShare(ctx, limit, piecePrivateKey, ipPort, stripeIndex, shareSize, i)
if err != nil {
share = Share{
Error: err,
PieceNum: i,
NodeID: limit.GetLimit().StorageNodeId,
Data: nil,
}
}
share := verifier.GetShare(ctx, limit, piecePrivateKey, ipPort, stripeIndex, shareSize, i)
ch <- &share
}(i, limit)
}
@ -387,8 +361,12 @@ func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Se
}
// GetShare use piece store client to download shares from nodes.
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err)
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share) {
defer mon.Task()(&ctx)(&share.Error)
share.PieceNum = pieceNum
share.NodeID = limit.GetLimit().StorageNodeId
share.FailurePhase = DialFailure
bandwidthMsgSize := shareSize
@ -407,6 +385,7 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
targetNodeID := limit.GetLimit().StorageNodeId
log := verifier.log.Named(targetNodeID.String())
var ps *piecestore.Client
var err error
// if cached IP is given, try connecting there first
if cachedIPAndPort != "" {
@ -428,10 +407,12 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
}
ps, err = piecestore.Dial(rpcpool.WithForceDial(timedCtx), verifier.dialer, nodeAddr, piecestore.DefaultConfig)
if err != nil {
return Share{}, Error.Wrap(err)
share.Error = Error.Wrap(err)
return share
}
}
share.FailurePhase = RequestFailure
defer func() {
err := ps.Close()
if err != nil {
@ -443,22 +424,30 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
downloader, err := ps.Download(timedCtx, limit.GetLimit(), piecePrivateKey, offset, int64(shareSize))
if err != nil {
return Share{}, err
share.Error = err
return share
}
defer func() { err = errs.Combine(err, downloader.Close()) }()
buf := make([]byte, shareSize)
_, err = io.ReadFull(downloader, buf)
if err != nil {
return Share{}, err
closeErr := downloader.Close()
if err != nil || closeErr != nil {
if errors.Is(err, io.ErrClosedPipe) {
// in some circumstances, this can be returned from the piecestore
// when the peer returned a different error. The downloader gets
// marked as being closed, even though we haven't closed it from
// this side, and ErrClosedPipe gets returned on the next Read
// instead of the actual error. We'll get the real error from
// downloader.Close().
err = nil
}
share.Error = errs.Combine(err, closeErr)
return share
}
share.Data = buf
share.FailurePhase = NoFailure
return Share{
Error: nil,
PieceNum: pieceNum,
NodeID: targetNodeID,
Data: buf,
}, nil
return share
}
// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit.

View File

@ -7,7 +7,9 @@ import (
"context"
"crypto/rand"
"fmt"
"net"
"os"
"syscall"
"testing"
"time"
@ -80,6 +82,7 @@ func TestDownloadSharesHappyPath(t *testing.T) {
for _, share := range shares {
assert.NoError(t, share.Error)
assert.Equal(t, audit.NoFailure, share.FailurePhase)
}
})
}
@ -143,8 +146,10 @@ func TestDownloadSharesOfflineNode(t *testing.T) {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.False(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.Unknown), "unexpected error: %+v", share.Error)
assert.Equal(t, audit.DialFailure, share.FailurePhase)
} else {
assert.NoError(t, share.Error)
assert.Equal(t, audit.NoFailure, share.FailurePhase)
}
}
})
@ -202,6 +207,7 @@ func TestDownloadSharesMissingPiece(t *testing.T) {
for _, share := range shares {
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.NotFound), "unexpected error: %+v", share.Error)
assert.Equal(t, audit.RequestFailure, share.FailurePhase)
}
})
}
@ -284,10 +290,121 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
for _, share := range shares {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.True(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.Equal(t, audit.DialFailure, share.FailurePhase)
}
})
}
// TestDownloadSharesDialIOTimeout checks that i/o timeout dial failures are
// handled appropriately.
//
// This test differs from TestDownloadSharesDialTimeout in that it causes the
// timeout error by replacing a storage node with a black hole TCP socket,
// causing the failure directly instead of faking it with dialer.DialLatency.
func TestDownloadSharesDialIOTimeout(t *testing.T) {
var group errgroup.Group
// we do this shutdown outside the testplanet scope, so that we can expect
// that planet has been shut down before waiting for the black hole goroutines
// to finish. (They won't finish until the remote end is closed, which happens
// during planet shutdown.)
defer func() { assert.NoError(t, group.Wait()) }()
testWithRangedLoop(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// require all nodes for each operation
Satellite: testplanet.ReconfigureRS(4, 4, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
pauseQueueing(satellite)
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
blackHoleNode := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes))]
require.NoError(t, planet.StopPeer(blackHoleNode))
// create a black hole in place of the storage node: a socket that only reads
// bytes and never says anything back. A connection to here using a bare TCP Dial
// would succeed, but a TLS Dial will not be able to handshake and will time out
// or wait forever.
listener, err := net.Listen("tcp", blackHoleNode.Addr())
require.NoError(t, err)
defer func() { assert.NoError(t, listener.Close()) }()
t.Logf("black hole listening on %s", listener.Addr())
group.Go(func() error {
for {
conn, err := listener.Accept()
if err != nil {
// this is terrible, but is apparently the standard and correct way to check
// for this specific error. See parseCloseError() in net/error_test.go in the
// Go stdlib.
assert.ErrorContains(t, err, "use of closed network connection")
return nil
}
t.Logf("connection made to black hole port %s", listener.Addr())
group.Go(func() (err error) {
defer func() { assert.NoError(t, conn.Close()) }()
// black hole: just read until the socket is closed on the other end
buf := make([]byte, 1024)
for {
_, err = conn.Read(buf)
if err != nil {
assert.ErrorIs(t, err, syscall.ECONNRESET)
return nil
}
}
})
}
})
randomIndex, err := audit.GetRandomStripe(ctx, segment)
require.NoError(t, err)
shareSize := segment.Redundancy.ShareSize
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
require.NoError(t, err)
verifier := satellite.Audit.Verifier
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
require.NoError(t, err)
observed := false
for _, share := range shares {
if share.NodeID.Compare(blackHoleNode.ID()) == 0 {
assert.ErrorIs(t, share.Error, context.DeadlineExceeded)
assert.Equal(t, audit.DialFailure, share.FailurePhase)
observed = true
} else {
assert.NoError(t, share.Error)
}
}
assert.Truef(t, observed, "No node in returned shares matched expected node ID")
})
}
// TestDownloadSharesDownloadTimeout checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that are successfully
// dialed, but time out during the download of the share contain an error that:
@ -365,6 +482,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
require.Len(t, shares, 1)
share := shares[0]
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.Equal(t, audit.RequestFailure, share.FailurePhase)
assert.False(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
})
}