corereader: take fd and list of segments instead of path

Now, we can get rid of the ELF parsing implementation in CoreReader.
Instead, we parse in ElfFile and pass the parsed information to
CoreReader.
This commit is contained in:
Omar Sandoval 2018-07-09 19:12:33 -07:00
parent d376b3c557
commit 800ee3ec36
8 changed files with 262 additions and 417 deletions

View File

@ -16,6 +16,7 @@ from typing import Any, Dict, List, Tuple, Union
import drgn
from drgn.corereader import CoreReader
from drgn.dwarfindex import DwarfIndex
from drgn.elf import ElfFile, ET_CORE, PT_LOAD
from drgn.kernelvariableindex import KernelVariableIndex
from drgn.program import Program, ProgramObject
from drgn.type import Type
@ -125,11 +126,20 @@ def main() -> None:
if args.core is None:
args.core = '/proc/kcore'
with CoreReader(args.core) as core_reader:
with open(args.core, 'rb') as core_file:
core_elf_file = ElfFile(core_file)
if core_elf_file.ehdr.e_type != ET_CORE:
sys.exit('ELF file is not a core dump')
# p_offset, p_vaddr, p_paddr, p_filesz, p_memsz
segments = [phdr[2:7] for phdr in core_elf_file.phdrs
if phdr.p_type == PT_LOAD]
core_reader = CoreReader(core_file.fileno(), segments)
if os.path.abspath(args.core) == '/proc/kcore':
vmcoreinfo_data = read_vmcoreinfo_from_sysfs(core_reader)
else:
for name, _, vmcoreinfo_data in core_reader.notes():
for name, _, vmcoreinfo_data in core_elf_file.notes():
if name == b'VMCOREINFO':
break
else:

View File

@ -14,37 +14,21 @@
#define min(a, b) ((a) < (b) ? (a) : (b))
static PyObject *ElfFormatError;
struct segment {
uint64_t offset;
uint64_t vaddr;
uint64_t paddr;
uint64_t filesz;
uint64_t memsz;
};
typedef struct {
PyObject_HEAD
int fd;
int phnum;
Elf64_Phdr *phdrs;
int num_segments;
struct segment *segments;
} CoreReader;
static int read_all(int fd, void *buf, size_t count)
{
char *p = buf;
while (count) {
ssize_t ret;
ret = read(fd, p, count);
if (ret == -1) {
if (errno == EINTR)
continue;
return -1;
} else if (ret == 0) {
errno = ENODATA;
return -1;
}
p += ret;
count -= ret;
}
return 0;
}
static int pread_all(int fd, void *buf, size_t count, off_t offset)
{
char *p = buf;
@ -68,125 +52,83 @@ static int pread_all(int fd, void *buf, size_t count, off_t offset)
return 0;
}
static void close_reader(CoreReader *self)
{
free(self->phdrs);
self->phdrs = NULL;
if (self->fd != -1) {
close(self->fd);
self->fd = -1;
}
}
static void CoreReader_dealloc(CoreReader *self)
{
close_reader(self);
free(self->segments);
Py_TYPE(self)->tp_free((PyObject *)self);
}
static int CoreReader_init(CoreReader *self, PyObject *args, PyObject *kwds)
{
static char *keywords[] = {"path", NULL};
PyObject *path_obj, *path;
Elf64_Phdr *phdrs = NULL;
Elf64_Ehdr ehdr;
static const char *errmsg = "segment must be (offset, vaddr, paddr, filesz, memsz)";
static char *keywords[] = {"fd", "segments", NULL};
int fd;
PyObject *segments_list;
struct segment *segments;
int num_segments, i;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O:CoreReader", keywords,
&path_obj))
if (!PyArg_ParseTupleAndKeywords(args, kwds, "iO!:CoreReader", keywords,
&fd, &PyList_Type, &segments_list))
return -1;
if (!PyUnicode_FSConverter(path_obj, &path))
return -1;
fd = open(PyBytes_AsString(path), O_RDONLY);
if (fd == -1) {
PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path_obj);
Py_DECREF(path);
return -1;
}
Py_DECREF(path);
if (read_all(fd, ehdr.e_ident, EI_NIDENT) == -1) {
if (errno == ENODATA)
PyErr_SetString(ElfFormatError, "not an ELF file");
else
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
if (ehdr.e_ident[EI_MAG0] != ELFMAG0 ||
ehdr.e_ident[EI_MAG1] != ELFMAG1 ||
ehdr.e_ident[EI_MAG2] != ELFMAG2 ||
ehdr.e_ident[EI_MAG3] != ELFMAG3) {
PyErr_SetString(ElfFormatError, "not an ELF file");
goto err;
}
if (ehdr.e_ident[EI_VERSION] != EV_CURRENT) {
PyErr_Format(ElfFormatError, "ELF version %u is not EV_CURRENT",
(unsigned int)ehdr.e_ident[EI_VERSION]);
if (PyList_GET_SIZE(segments_list) > INT_MAX) {
PyErr_SetString(PyExc_OverflowError, "too many segments");
return -1;
}
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
if (ehdr.e_ident[EI_DATA] != ELFDATA2LSB) {
#else
if (ehdr.e_ident[EI_DATA] != ELFDATA2MSB) {
#endif
PyErr_SetString(PyExc_NotImplementedError,
"ELF file endianness does not match machine");
return -1;
}
if (read_all(fd, (char *)&ehdr + EI_NIDENT,
sizeof(Elf64_Ehdr) - EI_NIDENT) == -1) {
if (errno == ENODATA)
PyErr_SetString(ElfFormatError, "ELF header is truncated");
else
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
/* Don't need to worry about overflow because e_phnum is 16 bits. */
phdrs = malloc(ehdr.e_phnum * sizeof(*phdrs));
if (!phdrs) {
num_segments = PyList_GET_SIZE(segments_list);
segments = calloc(num_segments, sizeof(*segments));
if (!segments) {
PyErr_NoMemory();
goto err;
return -1;
}
if ((off_t)ehdr.e_phoff < 0) {
PyErr_SetString(ElfFormatError,
"ELF program header table is beyond EOF");
goto err;
for (i = 0; i < num_segments; i++) {
struct segment *segment = &segments[i];
PyObject *segment_obj;
segment_obj = PySequence_Fast(PyList_GET_ITEM(segments_list, i),
errmsg);
if (!segment_obj)
goto err;
if (PySequence_Fast_GET_SIZE(segment_obj) != 5) {
PyErr_SetString(PyExc_ValueError, errmsg);
Py_DECREF(segment_obj);
goto err;
}
#define GET_MEMBER(var, idx) do { \
PyObject *tmp_obj; \
unsigned long long tmp; \
\
tmp_obj = PySequence_Fast_GET_ITEM(segment_obj, idx); \
tmp = PyLong_AsUnsignedLongLong(tmp_obj); \
if (tmp == (unsigned long long)-1 && PyErr_Occurred()) { \
Py_DECREF(segment_obj); \
goto err; \
} \
var = tmp; \
} while (0);
GET_MEMBER(segment->offset, 0);
GET_MEMBER(segment->vaddr, 1);
GET_MEMBER(segment->paddr, 2);
GET_MEMBER(segment->filesz, 3);
GET_MEMBER(segment->memsz, 4);
#undef GET_MEMBER
Py_DECREF(segment_obj);
}
if (lseek(fd, ehdr.e_phoff, SEEK_SET) == -1) {
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
if (read_all(fd, phdrs, ehdr.e_phnum * sizeof(*phdrs)) == -1) {
if (errno == ENODATA)
PyErr_SetString(ElfFormatError,
"ELF program header table is beyond EOF");
else
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
free(self->phdrs);
close(self->fd);
free(self->segments);
self->fd = fd;
self->phdrs = phdrs;
self->phnum = ehdr.e_phnum;
self->segments = segments;
self->num_segments = num_segments;
return 0;
err:
free(phdrs);
close(fd);
free(segments);
return -1;
}
@ -201,131 +143,13 @@ static CoreReader *CoreReader_new(PyTypeObject *subtype, PyObject *args,
return reader;
}
static PyObject *CoreReader_close(CoreReader *self)
{
close_reader(self);
Py_RETURN_NONE;
}
static PyObject *CoreReader_enter(PyObject *self)
{
Py_INCREF(self);
return self;
}
static PyObject *CoreReader_exit(CoreReader *self, PyObject *args,
PyObject *kwds)
{
static char *keywords[] = {"exc_type", "exc_value", "traceback", NULL};
PyObject *exc_type, *exc_value, *traceback;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "OOO:__exit__", keywords,
&exc_type, &exc_value, &traceback))
return NULL;
return CoreReader_close(self);
}
static PyObject *read_note(CoreReader *self, size_t n, off_t *off)
{
PyObject *bytes;
size_t skip;
if (!n)
return PyBytes_FromStringAndSize(NULL, 0);
bytes = PyBytes_FromStringAndSize(NULL, n - 1);
if (!bytes)
return NULL;
if (read_all(self->fd, PyBytes_AS_STRING(bytes), n - 1) == -1) {
Py_DECREF(bytes);
return NULL;
}
*off += n - 1;
/* One for the NUL byte, then align to 4 bytes. */
skip = 1 + (n % 4 ? 4 - n % 4 : 0);
if (lseek(self->fd, skip, SEEK_CUR) == -1) {
PyErr_SetFromErrno(PyExc_OSError);
Py_DECREF(bytes);
return NULL;
}
*off += skip;
return bytes;
}
static PyObject *CoreReader_notes(CoreReader *self)
{
PyObject *list;
int i;
list = PyList_New(0);
if (!list)
return NULL;
for (i = 0; i < self->phnum; i++) {
Elf64_Phdr *phdr = &self->phdrs[i];
off_t off;
if (phdr->p_type != PT_NOTE)
continue;
if (lseek(self->fd, phdr->p_offset, SEEK_SET) == -1) {
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
off = phdr->p_offset;
while (off - phdr->p_offset + sizeof(Elf64_Nhdr) <
phdr->p_filesz) {
PyObject *name, *desc, *item;
Elf64_Nhdr nhdr;
if (read_all(self->fd, &nhdr, sizeof(nhdr)) == -1) {
PyErr_SetFromErrno(PyExc_OSError);
goto err;
}
off += sizeof(nhdr);
name = read_note(self, nhdr.n_namesz, &off);
if (!name)
goto err;
desc = read_note(self, nhdr.n_descsz, &off);
if (!desc) {
Py_DECREF(name);
goto err;
}
item = Py_BuildValue("OIO", name,
(unsigned int)nhdr.n_type, desc);
Py_DECREF(desc);
Py_DECREF(name);
if (!item)
goto err;
if (PyList_Append(list, item) == -1) {
Py_DECREF(item);
goto err;
}
Py_DECREF(item);
}
}
return list;
err:
Py_DECREF(list);
return NULL;
}
static int read_core(CoreReader *self, void *buf, uint64_t address,
uint64_t count, int physical)
{
char *p = buf;
while (count) {
Elf64_Phdr *phdr;
struct segment *segment;
uint64_t segment_address;
uint64_t segment_offset;
off_t read_offset;
@ -336,16 +160,14 @@ static int read_core(CoreReader *self, void *buf, uint64_t address,
* The most recently used segments are at the end of the list,
* so search backwards.
*/
for (i = self->phnum - 1; i >= 0; i--) {
phdr = &self->phdrs[i];
if (phdr->p_type != PT_LOAD)
continue;
segment_address = (physical ? phdr->p_paddr :
phdr->p_vaddr);
for (i = self->num_segments - 1; i >= 0; i--) {
segment = &self->segments[i];
segment_address = (physical ? segment->paddr :
segment->vaddr);
if (segment_address == (uint64_t)-1)
continue;
if (segment_address <= address &&
address < segment_address + phdr->p_memsz)
address < segment_address + segment->memsz)
break;
}
if (i < 0) {
@ -356,26 +178,27 @@ static int read_core(CoreReader *self, void *buf, uint64_t address,
}
/* Move the used segment to the end of the list. */
if (i != self->phnum - 1) {
Elf64_Phdr tmp = *phdr;
if (i != self->num_segments - 1) {
struct segment tmp = *segment;
memmove(&self->phdrs[i], &self->phdrs[i + 1],
(self->phnum - i - 1) * sizeof(*self->phdrs));
phdr = &self->phdrs[self->phnum - 1];
*phdr = tmp;
memmove(&self->segments[i], &self->segments[i + 1],
(self->num_segments - i - 1) *
sizeof(*self->segments));
segment = &self->segments[self->num_segments - 1];
*segment = tmp;
}
segment_offset = address - segment_address;
if (segment_offset < phdr->p_filesz)
read_count = min(phdr->p_filesz - segment_offset, count);
if (segment_offset < segment->filesz)
read_count = min(segment->filesz - segment_offset, count);
else
read_count = 0;
if (segment_offset + read_count < phdr->p_memsz)
zero_count = min(phdr->p_memsz - segment_offset - read_count,
if (segment_offset + read_count < segment->memsz)
zero_count = min(segment->memsz - segment_offset - read_count,
count - read_count);
else
zero_count = 0;
read_offset = phdr->p_offset + segment_offset;
read_offset = segment->offset + segment_offset;
if (pread_all(self->fd, p, read_count, read_offset) == -1) {
PyErr_SetFromErrno(PyExc_OSError);
return -1;
@ -464,22 +287,9 @@ CoreReader_READ(long_double, long double, PyFloat_FromDouble)
"physical -- whether address is a physical memory address"}
#define CoreReader_DOC \
"CoreReader(path) -> new core file reader"
"CoreReader(fd, segments) -> new core file reader"
static PyMethodDef CoreReader_methods[] = {
{"close", (PyCFunction)CoreReader_close,
METH_NOARGS,
"close()\n\n"
"Close a core file reader."},
{"__enter__", (PyCFunction)CoreReader_enter,
METH_NOARGS},
{"__exit__", (PyCFunction)CoreReader_exit,
METH_VARARGS | METH_KEYWORDS},
{"notes", (PyCFunction)CoreReader_notes,
METH_NOARGS,
"notes()\n\n"
"Return a list of ELF notes in this core file as a list of (name, type, data)\n"
"tuples, where name and data are bytes and type is an int."},
{"read", (PyCFunction)CoreReader_read,
METH_VARARGS | METH_KEYWORDS,
"read(address, size, physical=False)\n\n"
@ -557,26 +367,8 @@ static struct PyModuleDef corereadermodule = {
PyMODINIT_FUNC
PyInit_corereader(void)
{
PyObject *name;
PyObject *m;
name = PyUnicode_FromString("drgn.elf");
if (!name)
return NULL;
m = PyImport_Import(name);
Py_DECREF(name);
if (!m)
return NULL;
ElfFormatError = PyObject_GetAttrString(m, "ElfFormatError");
if (!ElfFormatError) {
Py_DECREF(m);
return NULL;
}
Py_DECREF(m);
m = PyModule_Create(&corereadermodule);
if (!m)
return NULL;

View File

@ -1,9 +1,10 @@
from os import PathLike
from typing import Any, List, Tuple, Union
from typing import Any, List, Sequence, Tuple, Union
class CoreReader:
def __init__(self, path: Union[str, bytes, PathLike]) -> None: ...
def __init__(self, fd: int,
segments: Sequence[Tuple[int, int, int, int, int]]) -> None: ...
def close(self) -> None: ...
def __enter__(self) -> CoreReader: ...
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: ...

View File

@ -91,6 +91,26 @@ SHT_LOPROC = 0x70000000
SHT_HIPROC = 0x7fffffff
SHT_LOUSER = 0x80000000
SHT_HIUSER = 0x8fffffff
PT_NULL = 0
PT_LOAD = 1
PT_DYNAMIC = 2
PT_INTERP = 3
PT_NOTE = 4
PT_SHLIB = 5
PT_PHDR = 6
PT_TLS = 7
PT_NUM = 8
PT_LOOS = 0x60000000
PT_GNU_EH_FRAME = 0x6474e550
PT_GNU_STACK = 0x6474e551
PT_GNU_RELRO = 0x6474e552
PT_LOSUNW = 0x6ffffffa
PT_SUNWBSS = 0x6ffffffa
PT_SUNWSTACK = 0x6ffffffb
PT_HISUNW = 0x6fffffff
PT_HIOS = 0x6fffffff
PT_LOPROC = 0x70000000
PT_HIPROC = 0x7fffffff
SHN_MIPS_ACOMMON = 0xff00
SHN_MIPS_TEXT = 0xff01
SHN_MIPS_DATA = 0xff02
@ -135,16 +155,43 @@ SHT_MIPS_WHIRL = 0x70000026
SHT_MIPS_EH_REGION = 0x70000027
SHT_MIPS_XLATE_OLD = 0x70000028
SHT_MIPS_PDR_EXCEPTION = 0x70000029
PT_MIPS_REGINFO = 0x70000000
PT_MIPS_RTPROC = 0x70000001
PT_MIPS_OPTIONS = 0x70000002
PT_MIPS_ABIFLAGS = 0x70000003
SHN_PARISC_ANSI_COMMON = 0xff00
SHN_PARISC_HUGE_COMMON = 0xff01
SHT_PARISC_EXT = 0x70000000
SHT_PARISC_UNWIND = 0x70000001
SHT_PARISC_DOC = 0x70000002
PT_HP_TLS = (PT_LOOS + 0x0)
PT_HP_CORE_NONE = (PT_LOOS + 0x1)
PT_HP_CORE_VERSION = (PT_LOOS + 0x2)
PT_HP_CORE_KERNEL = (PT_LOOS + 0x3)
PT_HP_CORE_COMM = (PT_LOOS + 0x4)
PT_HP_CORE_PROC = (PT_LOOS + 0x5)
PT_HP_CORE_LOADABLE = (PT_LOOS + 0x6)
PT_HP_CORE_STACK = (PT_LOOS + 0x7)
PT_HP_CORE_SHM = (PT_LOOS + 0x8)
PT_HP_CORE_MMF = (PT_LOOS + 0x9)
PT_HP_PARALLEL = (PT_LOOS + 0x10)
PT_HP_FASTBIND = (PT_LOOS + 0x11)
PT_HP_OPT_ANNOT = (PT_LOOS + 0x12)
PT_HP_HSL_ANNOT = (PT_LOOS + 0x13)
PT_HP_STACK = (PT_LOOS + 0x14)
PT_PARISC_ARCHEXT = 0x70000000
PT_PARISC_UNWIND = 0x70000001
SHT_ALPHA_DEBUG = 0x70000001
SHT_ALPHA_REGINFO = 0x70000002
PT_ARM_EXIDX = (PT_LOPROC + 1)
SHT_ARM_EXIDX = (SHT_LOPROC + 1)
SHT_ARM_PREEMPTMAP = (SHT_LOPROC + 2)
SHT_ARM_ATTRIBUTES = (SHT_LOPROC + 3)
PT_IA_64_ARCHEXT = (PT_LOPROC + 0)
PT_IA_64_UNWIND = (PT_LOPROC + 1)
PT_IA_64_HP_OPT_ANOT = (PT_LOOS + 0x12)
PT_IA_64_HP_HSL_ANOT = (PT_LOOS + 0x13)
PT_IA_64_HP_STACK = (PT_LOOS + 0x14)
SHT_IA_64_EXT = (SHT_LOPROC + 0)
SHT_IA_64_UNWIND = (SHT_LOPROC + 1)
@ -170,6 +217,17 @@ class Elf_Ehdr(NamedTuple):
e_shstrndx: int
class Elf_Phdr(NamedTuple):
p_type: int
p_flags: int
p_offset: int
p_vaddr: int
p_paddr: int
p_filesz: int
p_memsz: int
p_align: int
class Elf_Shdr(NamedTuple):
sh_name: int
sh_type: int
@ -193,10 +251,17 @@ class Elf_Sym(NamedTuple):
st_size: int
class Elf_Note(NamedTuple):
name: bytes
type: int
data: bytes
class ElfFile:
def __init__(self, file: BinaryIO) -> None:
self.file = file
self.ehdr = self._ehdr()
self._phdrs: Optional[List[Elf_Phdr]] = None
self._shdrs: Optional[List[Elf_Shdr]] = None
self._sections: Optional[Dict[str, Elf_Shdr]] = None
self._symbols: Optional[Dict[str, List[Elf_Sym]]] = None
@ -227,6 +292,54 @@ class ElfFile:
buf = self.file.read(struct.calcsize(fmt))
return Elf_Ehdr(e_ident, *struct.unpack(fmt, buf))
def notes(self) -> List[Elf_Note]:
if self.ehdr.e_ident[EI_DATA] == ELFDATA2LSB:
fmt = '<'
else:
fmt = '>'
fmt += 'III' # Same for Elf32 and Elf64
list = []
for phdr in self.phdrs:
if phdr.p_type != PT_NOTE:
continue
self.file.seek(phdr.p_offset)
buf = self.file.read(phdr.p_filesz)
off = 0
while off < len(buf):
namesz, descsz, type_ = struct.unpack_from(fmt, buf, off)
off += 12
if namesz:
name = buf[off:off + namesz - 1]
off += (namesz + 3) & ~3
else:
name = b''
if descsz:
desc = buf[off:off + descsz - 1]
off += (descsz + 3) & ~3
else:
desc = b''
list.append(Elf_Note(name, type_, desc))
return list
@property
def phdrs(self) -> List[Elf_Phdr]:
if self._phdrs is None:
if self.ehdr.e_ident[EI_DATA] == ELFDATA2LSB:
fmt = '<'
else:
fmt = '>'
if self.ehdr.e_ident[EI_CLASS] == ELFCLASS64:
fmt += 'LLQQQQQQ'
else:
assert False
self.file.seek(self.ehdr.e_phoff)
buf = self.file.read(self.ehdr.e_phnum * self.ehdr.e_phentsize)
self._phdrs = [Elf_Phdr._make(x) for x in struct.iter_unpack(fmt, buf)]
return self._phdrs
def _parse_shdrs(self) -> None:
if self.ehdr.e_ident[EI_DATA] == ELFDATA2LSB:
fmt = '<'

View File

@ -10,6 +10,7 @@ constant_prefixes = [
'ELFMAG',
'ET_',
'EV_',
'PT_',
'SHN_',
'SHT_',
]

View File

@ -1,16 +1,13 @@
import base64
import contextlib
import struct
import tempfile
import unittest
from drgn.corereader import CoreReader
from drgn.elf import ElfFormatError
@contextlib.contextmanager
def tmpfile(data):
file = tempfile.NamedTemporaryFile()
file = tempfile.TemporaryFile()
try:
file.write(data)
file.flush()
@ -19,123 +16,54 @@ def tmpfile(data):
file.close()
def make_elf_file(segments=None):
if segments is None:
segments = [(0xffff0000, b'foobar\0\0')]
buf = bytearray(64 + 56 * len(segments))
struct.pack_into(
'<16sHHLQQQLHHHHHH', buf, 0,
b'\x7fELF\x02\x01\x01\0\0\0\0\0\0\0\0\0', # e_ident
4, # e_type (ET_CORE)
62, # e_machine (EM_X86_64)
1, # e_version (EV_CURRENT)
0, # e_entry
64, # e_phoff (right after the header)
0, # e_shoff
0, # e_flags
64, # e_ehsize
56, # e_phentsize
len(segments), # e_phnum
0, # e_shentsize
0, # e_shnum
0, # e_shstrndx
)
for i, segment in enumerate(segments):
if len(buf) % 4096:
buf.extend(bytes(4096 - len(buf) % 4096))
struct.pack_into(
'<LLQQQQQQ', buf, 64 + i * 56,
1, # p_type (PT_LOAD)
0x7, # p_flags (PF_R | PF_W | PF_X)
len(buf), # p_offset
segment[0], # p_vaddr
0xffffffffffffffff, # p_paddr
len(segment[1]), # p_filesz
segment[2] if len(segment) >= 3 else len(segment[1]), # p_memsz
4096, # p_align
)
buf.extend(segment[1])
return buf
class TestCoreReader(unittest.TestCase):
def test_short_file(self):
elf_file = make_elf_file()
with tmpfile(elf_file[:3]) as file:
self.assertRaisesRegex(ElfFormatError, 'not an ELF file',
CoreReader, file.name)
def test_invalid_elf_magic(self):
elf_file = make_elf_file()
elf_file[0] = 88
with tmpfile(elf_file) as file:
self.assertRaisesRegex(ElfFormatError, 'not an ELF file',
CoreReader, file.name)
def test_invalid_elf_version(self):
elf_file = make_elf_file()
elf_file[6] = 2
with tmpfile(elf_file) as file:
self.assertRaisesRegex(ElfFormatError, 'ELF version',
CoreReader, file.name)
def test_truncated_elf_header(self):
elf_file = make_elf_file()
with tmpfile(elf_file[:16]) as file:
self.assertRaisesRegex(ElfFormatError, 'ELF header is truncated',
CoreReader, file.name)
def test_program_header_table_overflow(self):
elf_file = make_elf_file()
elf_file[32:40] = b'\xff\xff\xff\xff\xff\xff\xff\xff'
with tmpfile(elf_file) as file:
self.assertRaisesRegex(ElfFormatError, 'ELF program header table is beyond EOF',
CoreReader, file.name)
def test_truncated_program_header_table(self):
elf_file = make_elf_file()
with tmpfile(elf_file[:64]) as file:
self.assertRaisesRegex(ElfFormatError, 'ELF program header table is beyond EOF',
CoreReader, file.name)
def test_bad_segments(self):
self.assertRaises(TypeError, CoreReader, 0, 0)
self.assertRaises(TypeError, CoreReader, 0, [0])
self.assertRaises(ValueError, CoreReader, 0, [()])
self.assertRaises(OverflowError, CoreReader, 0, [(2**64, 0, 0, 0, 0)])
def test_simple_read(self):
data = b'hello, world!'
elf_file = make_elf_file([(0xffff0000, data)])
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
segments = [(0, 0xffff0000, 0x0, len(data), len(data))]
with tmpfile(data) as file:
core_reader = CoreReader(file.fileno(), segments)
self.assertEqual(core_reader.read(0xffff0000, len(data)), data)
def test_bad_address(self):
elf_file = make_elf_file()
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
data = b'hello, world!'
segments = [(0, 0xffff0000, 0x0, len(data), len(data))]
with tmpfile(data) as file:
core_reader = CoreReader(file.fileno(), segments)
self.assertRaisesRegex(ValueError, 'could not find memory segment',
core_reader.read, 0x0, 4)
core_reader.read, 0xdeadbeef, 4)
def test_segment_overflow(self):
data = b'hello, world!'
elf_file = make_elf_file([(0xffff0000, data)])
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
segments = [(0, 0xffff0000, 0x0, len(data), len(data))]
with tmpfile(data) as file:
core_reader = CoreReader(file.fileno(), segments)
self.assertRaisesRegex(ValueError, 'could not find memory segment',
core_reader.read, 0xffff0000, len(data) + 1)
def test_contiguous_segments(self):
data = b'hello, world!'
elf_file = make_elf_file([
(0xffff0000, data[:4]),
(0xfffff000, b'foobar'),
(0xffff0004, data[4:]),
])
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
self.assertEqual(core_reader.read(0xffff0000, len(data)), data)
data = b'hello, world!\0foobar'
segments = [
(0, 0xffff0000, 0x0, 4, 4),
(14, 0xfffff000, 0x0, 6, 6),
(4, 0xffff0004, 0x0, 10, 10),
]
with tmpfile(data) as file:
core_reader = CoreReader(file.fileno(), segments)
self.assertEqual(core_reader.read(0xffff0000, 14), data[:14])
def test_zero_filled_segment(self):
data = b'hello, world!'
elf_file = make_elf_file([(0xffff0000, data, len(data) + 4)])
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
segments = [
(0, 0xffff0000, 0x0, 13, 17),
]
with tmpfile(data) as file:
core_reader = CoreReader(file.fileno(), segments)
self.assertEqual(core_reader.read(0xffff0000, len(data) + 4),
data + bytes(4))
self.assertEqual(core_reader.read(0xffff0000 + len(data), 4),

View File

@ -1,11 +1,11 @@
import math
import operator
import tempfile
import unittest
from drgn.corereader import CoreReader
from drgn.program import Program, ProgramObject
from drgn.type import IntType, StructType, TypedefType
from tests.test_corereader import make_elf_file, tmpfile
from tests.test_type import point_type
from tests.test_typeindex import TypeIndexTestCase, TYPES
@ -25,14 +25,16 @@ class TestProgramObject(TypeIndexTestCase):
if a._value != b._value:
raise self.failureException(msg or f'object values differ: {a._value!r} != {b._value!r}')
self.addTypeEqualityFunc(ProgramObject, program_object_equality_func)
elf_file = make_elf_file([
(0xffff0000, b'\x01\x00\x00\x00\x02\x00\x00\x00hello\x00\x00\x00'),
])
with tmpfile(elf_file) as file:
core_reader = CoreReader(file.name)
buffer = b'\x01\x00\x00\x00\x02\x00\x00\x00hello\x00\x00\x00'
segments = [(0, 0xffff0000, 0x0, len(buffer), len(buffer))]
self.tmpfile = tempfile.TemporaryFile()
self.tmpfile.write(buffer)
self.tmpfile.flush()
core_reader = CoreReader(self.tmpfile.fileno(), segments)
self.program = Program(reader=core_reader, type_index=self.type_index,
variable_index=None)
def tearDown(self):
self.tmpfile.close()
super().tearDown()
def test_constructor(self):

View File

@ -21,7 +21,7 @@ from drgn.type import (
UnionType,
VoidType,
)
from tests.test_corereader import make_elf_file, tmpfile
from tests.test_corereader import tmpfile
def compound_type_dict_for_eq(type_):
@ -637,23 +637,21 @@ class TestOperandType(TypeTestCase):
class TestTypeRead(unittest.TestCase):
def test_void(self):
type_ = VoidType()
with tmpfile(make_elf_file()) as file:
reader = CoreReader(file.name)
self.assertRaises(ValueError, type_.read, reader, 0x0)
self.assertRaises(ValueError, type_.read_pretty, reader, 0x0)
with tmpfile(b'') as file:
reader = CoreReader(file.fileno(), [])
self.assertRaises(ValueError, type_.read, reader, 0x0)
self.assertRaises(ValueError, type_.read_pretty, reader, 0x0)
def assertReads(self, type_, buffer, expected_value,
expected_pretty_cast, expected_pretty_nocast):
elf_file = make_elf_file([
(0xffff0000, buffer),
])
with tmpfile(elf_file) as file:
reader = CoreReader(file.name)
self.assertEqual(type_.read(reader, 0xffff0000), expected_value)
self.assertEqual(type_.read_pretty(reader, 0xffff0000),
expected_pretty_cast)
self.assertEqual(type_.read_pretty(reader, 0xffff0000, cast=False),
expected_pretty_nocast)
segments = [(0, 0xffff0000, 0x0, len(buffer), len(buffer))]
with tmpfile(buffer) as file:
reader = CoreReader(file.fileno(), segments)
self.assertEqual(type_.read(reader, 0xffff0000), expected_value)
self.assertEqual(type_.read_pretty(reader, 0xffff0000),
expected_pretty_cast)
self.assertEqual(type_.read_pretty(reader, 0xffff0000, cast=False),
expected_pretty_nocast)
def test_int(self):
type_ = IntType('int', 4, True)