From 7ae295dda5aaef6e94020ba34020ad774f42010b Mon Sep 17 00:00:00 2001 From: Omar Sandoval Date: Mon, 9 Jul 2018 17:03:15 -0700 Subject: [PATCH] Add MemoryViewIO This will be used so that ElfFile can take a BufferedIOReader instead of a memoryview. --- drgn/memoryviewio.py | 80 ++++++++++++++++++++++++++++++++++++++ tests/test_memoryviewio.py | 33 ++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 drgn/memoryviewio.py create mode 100644 tests/test_memoryviewio.py diff --git a/drgn/memoryviewio.py b/drgn/memoryviewio.py new file mode 100644 index 00000000..ccdd1510 --- /dev/null +++ b/drgn/memoryviewio.py @@ -0,0 +1,80 @@ +# Copyright 2018 - Omar Sandoval +# SPDX-License-Identifier: GPL-3.0+ + +import io +from typing import Optional + + +# Based on BytesIO from Lib/_pyio.py in CPython. +class MemoryViewIO(io.BufferedIOBase): + def __init__(self, mem: memoryview) -> None: + self._mem = mem + self._pos = 0 + + def close(self) -> None: + del self._mem + super().close() + + # Returns memoryview instead of bytes, which isn't technically correct. + def read(self, size: Optional[int] = -1) -> memoryview: # type: ignore + if self.closed: + raise ValueError("read from closed file") + if size is None: + size = -1 + else: + try: + size_index = size.__index__ + except AttributeError: + raise TypeError(f"{size!r} is not an integer") + else: + size = size_index() + if size < 0: + size = len(self._mem) + newpos = min(len(self._mem), self._pos + size) + m = self._mem[self._pos:newpos] + self._pos = newpos + return m + + def read1(self, size: Optional[int] = -1) -> memoryview: # type: ignore + return self.read(size) + + def seek(self, pos: int, whence: int = 0) -> int: + if self.closed: + raise ValueError("seek on closed file") + try: + pos_index = pos.__index__ + except AttributeError: + raise TypeError(f"{pos!r} is not an integer") + else: + pos = pos_index() + if whence == 0: + if pos < 0: + raise ValueError("negative seek position %r" % (pos,)) + self._pos = pos + elif whence == 1: + self._pos = max(0, self._pos + pos) + elif whence == 2: + self._pos = max(0, len(self._mem) + pos) + else: + raise ValueError("unsupported whence value") + return self._pos + + def tell(self) -> int: + if self.closed: + raise ValueError("tell on closed file") + return self._pos + + def readable(self) -> bool: + if self.closed: + raise ValueError("I/O operation on closed file.") + return True + + def writable(self) -> bool: + if self.closed: + raise ValueError("I/O operation on closed file.") + return False + + def seekable(self) -> bool: + if self.closed: + raise ValueError("I/O operation on closed file.") + return True diff --git a/tests/test_memoryviewio.py b/tests/test_memoryviewio.py new file mode 100644 index 00000000..8aaebcda --- /dev/null +++ b/tests/test_memoryviewio.py @@ -0,0 +1,33 @@ +import unittest + +from drgn.memoryviewio import MemoryViewIO + + +class TestCoreReader(unittest.TestCase): + def test_empty(self): + f = MemoryViewIO(memoryview(b'')) + self.assertEqual(f.read(4), b'') + self.assertEqual(f.tell(), 0) + + def test_read(self): + f = MemoryViewIO(memoryview(b'hello, world!')) + self.assertEqual(f.read(5), b'hello') + self.assertEqual(f.read(5), b', wor') + self.assertEqual(f.read(100), b'ld!') + + def test_seek(self): + f = MemoryViewIO(memoryview(b'hello, world!')) + self.assertEqual(f.seek(7), 7) + self.assertEqual(f.read(5), b'world') + self.assertEqual(f.seek(0, whence=1), 12) + + self.assertEqual(f.seek(-8, whence=1), 4) + self.assertEqual(f.read(3), b'o, ') + + self.assertEqual(f.seek(-3, whence=2), 10) + self.assertEqual(f.read(4), b'ld!') + + def test_close(self): + f = MemoryViewIO(memoryview(b'hello, world!')) + f.close() + self.assertTrue(f.closed)