tests: add tests for new net and tcp helpers

Because the vmtest kernels aren't currently built with networking
support, we need to skip them if TCP isn't supported.
This commit is contained in:
Omar Sandoval 2020-01-02 16:22:38 -08:00
parent bb7a8eedcf
commit 370bf6f16a
3 changed files with 56 additions and 0 deletions

View File

@ -1,7 +1,9 @@
import ctypes
import errno
import os
import re
import signal
import socket
import time
import unittest
@ -97,3 +99,14 @@ def umount(target, flags=0):
if _umount2(os.fsencode(target), flags) == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno), target)
def create_socket(*args, **kwds):
try:
return socket.socket(*args, **kwds)
except OSError as e:
if e.errno in (errno.ENOSYS, errno.EAFNOSUPPORT,
errno.ESOCKTNOSUPPORT):
raise unittest.SkipTest('kernel does not support TCP')
else:
raise

View File

@ -0,0 +1,15 @@
import os
from drgn import cast
from drgn.helpers.linux.fs import fget
from drgn.helpers.linux.net import sk_fullsock
from drgn.helpers.linux.pid import find_task
from tests.helpers.linux import LinuxHelperTestCase, create_socket
class TestNet(LinuxHelperTestCase):
def test_sk_fullsock(self):
with create_socket() as sock:
file = fget(find_task(self.prog, os.getpid()), sock.fileno())
sk = cast('struct socket *', file.private_data).sk.read_()
self.assertTrue(sk_fullsock(sk))

View File

@ -0,0 +1,28 @@
import os
import socket
from drgn import cast
from drgn.helpers.linux.fs import fget
from drgn.helpers.linux.pid import find_task
from drgn.helpers.linux.tcp import get_tcp_states, sk_tcpstate
from tests.helpers.linux import LinuxHelperTestCase, create_socket
class TestTcp(LinuxHelperTestCase):
def test_tcp_states(self):
with create_socket() as sock:
task = find_task(self.prog, os.getpid())
file = fget(task, sock.fileno())
sk = cast('struct socket *', file.private_data).sk
TcpStates = get_tcp_states(self.prog)
self.assertEqual(sk_tcpstate(sk), TcpStates.CLOSE)
sock.bind(('localhost', 0))
sock.listen()
self.assertEqual(sk_tcpstate(sk), TcpStates.LISTEN)
with socket.create_connection(sock.getsockname()), \
sock.accept()[0] as sock2:
file = fget(task, sock2.fileno())
sk = cast('struct socket *', file.private_data).sk
self.assertEqual(sk_tcpstate(sk), TcpStates.ESTABLISHED)