diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index 53d3d0d1c..1fb4868c2 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -755,7 +755,63 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, errorCode := int(pb.TransferFailed_Error_value[message.Failed.Error.String()]) - // TODO if error code is NOT_FOUND, the node no longer has the piece. remove the queue item and the pointer + // If the error code is NOT_FOUND, the node no longer has the piece. + // Remove the queue item and remove the node from the pointer. + // If the pointer is not piece hash verified, do not count this as a failure. + if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND { + endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.path), zap.Int32("piece num", transfer.pieceNum)) + pointer, err := endpoint.metainfo.Get(ctx, string(transfer.path)) + if err != nil { + return Error.Wrap(err) + } + remote := pointer.GetRemote() + if remote == nil { + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + if err != nil { + return Error.Wrap(err) + } + pending.delete(pieceID) + return nil + } + pieces := remote.GetRemotePieces() + + var nodePiece *pb.RemotePiece + for _, piece := range pieces { + if piece.NodeId == nodeID && piece.PieceNum == transfer.pieceNum { + nodePiece = piece + } + } + if nodePiece == nil { + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + if err != nil { + return Error.Wrap(err) + } + pending.delete(pieceID) + return nil + } + + _, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.path), pointer, nil, []*pb.RemotePiece{nodePiece}) + if err != nil { + return Error.Wrap(err) + } + + // If the pointer was piece hash verified, we know this node definitely should have the piece + // Otherwise, no penalty. + if pointer.PieceHashesVerified { + err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1) + if err != nil { + return Error.Wrap(err) + } + } + + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + if err != nil { + return Error.Wrap(err) + } + pending.delete(pieceID) + + return nil + } transferQueueItem.LastFailedAt = &now transferQueueItem.FailedCount = &failedCount diff --git a/satellite/gracefulexit/endpoint_test.go b/satellite/gracefulexit/endpoint_test.go index e52d8db80..88b74e379 100644 --- a/satellite/gracefulexit/endpoint_test.go +++ b/satellite/gracefulexit/endpoint_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap" @@ -30,6 +31,7 @@ import ( "storj.io/storj/pkg/storj" "storj.io/storj/satellite" "storj.io/storj/satellite/overlay" + "storj.io/storj/storage" "storj.io/storj/storagenode" "storj.io/storj/storagenode/gracefulexit" "storj.io/storj/storagenode/pieces" @@ -832,6 +834,163 @@ func TestExitDisabled(t *testing.T) { }) } +func TestFailureNotFoundPieceHashVerified(t *testing.T) { + testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) { + response, err := processClient.Recv() + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_TransferPiece: + require.NotNil(t, m) + + message := &pb.StorageNodeMessage{ + Message: &pb.StorageNodeMessage_Failed{ + Failed: &pb.TransferFailed{ + OriginalPieceId: m.TransferPiece.OriginalPieceId, + Error: pb.TransferFailed_NOT_FOUND, + }, + }, + } + err = processClient.Send(message) + require.NoError(t, err) + default: + require.FailNow(t, "should not reach this case: %#v", m) + } + + response, err = processClient.Recv() + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_ExitFailed: + require.NotNil(t, m) + require.NotNil(t, m.ExitFailed) + require.Equal(t, m.ExitFailed.Reason, pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED) + default: + require.FailNow(t, "should not reach this case: %#v", m) + } + + // check that node is no longer in the pointer + keys, err := satellite.Metainfo.Database.List(ctx, nil, -1) + require.NoError(t, err) + + var pointer *pb.Pointer + for _, key := range keys { + p, err := satellite.Metainfo.Service.Get(ctx, string(key)) + require.NoError(t, err) + + if p.GetRemote() != nil { + pointer = p + break + } + } + require.NotNil(t, pointer) + for _, piece := range pointer.GetRemote().GetRemotePieces() { + require.NotEqual(t, piece.NodeId, exitingNode.ID()) + } + + // check that the exit has completed and we have the correct transferred/failed values + progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID()) + require.NoError(t, err) + + require.Equal(t, int64(0), progress.PiecesTransferred) + require.Equal(t, int64(1), progress.PiecesFailed) + }) + +} + +func TestFailureNotFoundPieceHashUnverified(t *testing.T) { + testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) { + // retrieve remote segment + keys, err := satellite.Metainfo.Database.List(ctx, nil, -1) + require.NoError(t, err) + + var oldPointer *pb.Pointer + var path []byte + for _, key := range keys { + p, err := satellite.Metainfo.Service.Get(ctx, string(key)) + require.NoError(t, err) + + if p.GetRemote() != nil { + oldPointer = p + path = key + break + } + } + + // replace pointer with non-piece-hash-verified pointer + require.NotNil(t, oldPointer) + oldPointerBytes, err := proto.Marshal(oldPointer) + require.NoError(t, err) + newPointer := &pb.Pointer{} + err = proto.Unmarshal(oldPointerBytes, newPointer) + require.NoError(t, err) + newPointer.PieceHashesVerified = false + newPointerBytes, err := proto.Marshal(newPointer) + require.NoError(t, err) + err = satellite.Metainfo.Service.DB.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes) + require.NoError(t, err) + + // begin processing graceful exit messages + response, err := processClient.Recv() + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_TransferPiece: + require.NotNil(t, m) + + message := &pb.StorageNodeMessage{ + Message: &pb.StorageNodeMessage_Failed{ + Failed: &pb.TransferFailed{ + OriginalPieceId: m.TransferPiece.OriginalPieceId, + Error: pb.TransferFailed_NOT_FOUND, + }, + }, + } + err = processClient.Send(message) + require.NoError(t, err) + default: + require.FailNow(t, "should not reach this case: %#v", m) + } + + response, err = processClient.Recv() + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_ExitCompleted: + require.NotNil(t, m) + default: + require.FailNow(t, "should not reach this case: %#v", m) + } + + // check that node is no longer in the pointer + keys, err = satellite.Metainfo.Database.List(ctx, nil, -1) + require.NoError(t, err) + + var pointer *pb.Pointer + for _, key := range keys { + p, err := satellite.Metainfo.Service.Get(ctx, string(key)) + require.NoError(t, err) + + if p.GetRemote() != nil { + pointer = p + break + } + } + require.NotNil(t, pointer) + for _, piece := range pointer.GetRemote().GetRemotePieces() { + require.NotEqual(t, piece.NodeId, exitingNode.ID()) + } + + // check that the exit has completed and we have the correct transferred/failed values + progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID()) + require.NoError(t, err) + + require.Equal(t, int64(0), progress.PiecesTransferred) + require.Equal(t, int64(0), progress.PiecesFailed) + }) + +} + func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) { successThreshold := 4 testplanet.Run(t, testplanet.Config{