# -*- coding: utf-8 -*-
# This code is part of Amoco
# Copyright (C) 2007-2019 Axel Tillequin (bdcht3@gmail.com)
# published under GPLv2 license
"""
system/core.py
==============
This module defines all task/process core classes related to binary format and
execution inherited by all system specific execution classes of
the :mod:`amoco.system` package.
"""
from amoco.arch.core import Bits
from amoco.ui.views import execView, dataView
from amoco.logger import Log
logger = Log(__name__)
logger.debug("loading module")
# ------------------------------------------------------------------------------
[docs]class CoreExec(object):
"""
This class implements the base class for Task(s).
CoreExec or Tasks are used to represent a memory mapped binary
executable program, providing the generic instruction or data fetchers and
the mandatory API for :mod:`amoco.emu` or :mod:`amoco.sa` analysis classes.
Most of the :mod:`amoco.system` modules use this base class to implement
a OS-specific Task class (see Linux/x86, Win32/x86, etc).
Attributes:
bin: the program executable format object. Currently supported formats
are provided in :mod:`system.elf` (Elf32/64), :mod:`system.pe` (PE)
and :mod:`system.utils` (HEX/SREC).
cpu: reference to the architecture cpu module, which provides a generic
access to the PC() program counter and
obviously the CPU registers and disassembler.
OS: optional reference to the OS associated to the child Task.
state: the :class:`mapper` instance that represents the current state
of the executable program, including mapping of registers as well
as the :class:`MemoryMap` instance that represents the virtual
memory of the program.
"""
__slots__ = ["bin", "cpu", "OS", "state", "view"]
def __init__(self, p, cpu=None):
self.bin = p
self.cpu = cpu
self.OS = None
self.state = self.initstate()
self.view = execView(of=self)
def __repr__(self):
c = self.__class__.__name__
o = self.OS.__module__ if self.OS else "-"
n = self.bin.filename
return "<%s %s '%s'>" % (c, o, n)
def __str__(self):
return str(self.view)
def initstate(self):
from amoco.cas.mapper import mapper
m = mapper()
return m
[docs] def read_data(self, vaddr, size):
"""
fetch size data bytes at virtual address vaddr, returned
as a list of items being either raw bytes or symbolic expressions.
"""
return self.state.mmap.read(vaddr, size)
[docs] def read_instruction(self, vaddr, **kargs):
"""
fetch instruction at virtual address vaddr, returned as an
cpu.instruction instance or cpu.ext in case an external expression
is found at vaddr or vaddr is an external symbol.
Raises MemoryError in case vaddr is not mapped,
and returns None if disassembler fails to decode bytes at vaddr.
Note:
Returning a cpu.ext expression means that this instruction starts
an external stub function.
It is the responsibility of the fetcher (emulator or analyzer)
to eventually call the stub to modify the state mapper.
"""
if self.cpu is None:
logger.error("no cpu imported")
raise ValueError
maxlen = self.cpu.disassemble.maxlen
if isinstance(vaddr, int):
addr = self.cpu.cst(vaddr, self.cpu.PC().size)
elif vaddr._is_ext:
vaddr.address = vaddr
return vaddr
else:
addr = vaddr
try:
istr = self.state.mmap.read(vaddr, maxlen)
except MemoryError as e:
logger.verbose("vaddr %s is not mapped" % addr)
raise MemoryError(e)
else:
if len(istr) <= 0:
logger.verbose("failed to read instruction at %s" % addr)
raise MemoryError(addr)
elif not isinstance(istr[0], bytes):
if istr[0]._is_ext:
istr[0].address = addr
return istr[0]
else:
return None
i = self.cpu.disassemble(istr[0], **kargs)
if i is None:
logger.warning("disassemble failed at vaddr %s" % addr)
return None
else:
if i.address is None:
i.address = addr
xsz = i.misc["xsz"] or 0
if xsz > 0:
xdata = self.state.mmap.read(vaddr + i.length, xsz)
i.xdata(i, xdata)
return i
def symbol_for(self,address):
info = None
if address in self.bin.variables:
info = self.bin.variables[address]
if isinstance(info,tuple):
info = info[0]
info = "$%s"%info
elif address in self.bin.functions:
info = self.bin.functions[address]
if isinstance(info,tuple):
info = info[0]
info = "<%s>"%info
elif self.OS and (address in self.OS.symbols):
info = self.OS.symbols[address]
info = "#%s"%info
return info or ""
def segment_for(self,address,stype=None):
s = self.bin.getinfo(address)[0]
return s.name if hasattr(s,'name') else ""
[docs] def getx(self, loc, size=8, sign=False):
"""
high level method to get the expressions value associated
to left-value loc (register or address). The returned value
is an integer if the expression is constant or a symbolic
expression instance.
The input loc is either a register string, an integer address,
or associated expressions' instances.
Optionally, the returned expression sign flag can be adjusted
by the sign argument.
"""
if isinstance(loc, str):
x = getattr(self.cpu, loc)
elif isinstance(loc, int):
endian = self.cpu.get_data_endian()
psz = self.cpu.PC().size
addr = self.cpu.cst(loc, psz)
x = self.cpu.mem(addr, size, endian=endian)
else:
x = loc
r = self.state(x)
r.sf = sign
return r.value if r._is_cst else r
[docs] def setx(self, loc, val, size=0):
"""
high level method to set the expressions value associated
to left-value loc (register or address). The value
is possibly an integer or a symbolic expression instance.
The input loc is either a register string, an integer address,
or associated expressions' instances.
Optionally, the size of the loc expression can be adjusted
by the size argument.
"""
if isinstance(loc, str):
x = getattr(self.cpu, loc)
size = x.size
elif isinstance(loc, int):
endian = self.cpu.get_data_endian()
psz = self.cpu.PC().size
x = self.cpu.mem(self.cpu.cst(addr, psz), size, endian=endian)
else:
x = loc
size = x.size
if isinstance(val, bytes):
if x._is_mem:
x.size = len(val) if size == 0 else size
self.state._Mem_write(x.a, val)
else:
endian = self.cpu.get_data_endian()
v = self.cpu.cst(
Bits(val[0 : x.size : endian], bitorder=1).int(), x.size * 8
)
self.state[x] = v
elif isinstance(val, int):
self.state[x] = self.cpu.cst(val, size)
else:
self.state[x] = val
[docs] def get_int64(self, loc):
"get 64-bit int expression of current state(loc)"
return self.getx(loc, size=64, sign=True)
[docs] def get_uint64(self, loc):
"get 64-bit unsigned int expression of current state(loc)"
return self.getx(loc, size=64)
[docs] def get_int32(self, loc):
"get 32-bit int expression of current state(loc)"
return self.getx(loc, size=32, sign=True)
[docs] def get_uint32(self, loc):
"get 32-bit unsigned int expression of current state(loc)"
return self.getx(loc, size=32)
[docs] def get_int16(self, loc):
"get 16-bit int expression of current state(loc)"
return self.getx(loc, size=16, sign=True)
[docs] def get_uint16(self, loc):
"get 16-bit unsigned int expression of current state(loc)"
return self.getx(loc, size=16)
[docs] def get_int8(self, loc):
"get 8-bit int expression of current state(loc)"
return self.getx(loc, sign=True)
[docs] def get_uint8(self, loc):
"get 8-bit unsigned int expression of current state(loc)"
return self.getx(loc)
[docs] def get_cstr(self, loc):
"get null-terminated unsigned char array of current state(loc)"
A = [self.get_uint8(loc)]
while A[-1]!=0:
loc += 1
A.append(self.get_uint8(loc))
return bytes(A)
# ------------------------------------------------------------------------------
[docs]class DefineStub(object):
"""
decorator to define a stub for the given 'refname' library function.
"""
def __init__(self, obj, refname, default=False):
self.obj = obj
self.ref = refname
self.default = default
def __call__(self, f):
if self.default:
self.obj.default_stub = staticmethod(f)
else:
self.obj.stubs[self.ref] = f
return f
@staticmethod
def warning(env, **kargs):
logger.warning("no default stub defined, this will not ret!")
# ------------------------------------------------------------------------------
[docs]class shellcode(BinFormat):
"""
This is the most basic file format for executable binary code. It
provides zero information about the targeted architecture, entrypoints, or
any other data or code dependencies.
"""
def __init__(self,dataio):
self.data = dataio
@property
def entrypoints(self):
return [0]
@property
def filename(self):
return self.data.name
@property
def dataio(self):
return self.data
[docs]class DataIO(object):
"""
This class simply wraps a binary file or a bytes string and implements
both the file and bytes interface. It allows an input to be provided as
files of bytes and manipulated as either a file or a bytes object.
"""
def __init__(self, f):
if isinstance(f, bytes):
from io import BytesIO
self.f = BytesIO(f)
else:
self.f = f
self.view = dataView(dataio=self)
def __getitem__(self, i):
stay = self.f.tell()
sta = i.start
if sta is None:
sta = stay
self.f.seek(sta, 0)
if i.stop is None:
data = self.f.read()
else:
data = self.f.read(i.stop - sta)
self.f.seek(stay, 0)
return data
def size(self):
stay = self.f.tell()
self.f.seek(0,2)
sz = self.f.tell()
self.f.seek(stay,0)
return sz
def read(self, size=-1):
return self.f.read(size)
def readline(self, size=-1):
return self.f.readline(size)
def readlines(self, size=-1):
return self.f.readlines(size)
def xreadlines(self, size=-1):
return self.f.xreadlines(size)
def write(self, s):
return self.f.write(s)
def writelines(self, l):
return self.f.writelines(l)
def seek(self, offset, whence=0):
return self.f.seek(offset, whence)
def tell(self):
return self.f.tell()
def flush(self):
return self.f.flush()
def fileno(self):
return self.f.fileno()
def isatty(self):
return self.f.isatty()
def next(self):
return self.f.next()
def truncate(self, size=0):
return self.f.truncate(size)
def close(self):
return self.f.close()
@property
def closed(self):
return self.f.closed
@property
def encoding(self):
return self.f.encoding
@property
def errors(self):
return self.f.errors
@property
def mode(self):
return self.f.mode
@property
def name(self):
try:
return self.f.name
except AttributeError:
s = bytes(self.f.getvalue())
return "(sc-%s...)" % ("".join(["%02x" % x for x in s])[:8])
filename = name
@property
def newlines(self):
return self.f.newlines
@property
def softspace(self):
return self.f.softspace
# ------------------------------------------------------------------------------
[docs]def read_program(filename):
"""
Identifies the program header and returns an ELF, PE, Mach-O or DataIO.
Args:
filename (str): the program to read.
Returns:
an instance of currently supported program format
(ELF, PE, Mach-O, HEX, SREC)
"""
try:
data = open(filename, "rb")
except (ValueError, TypeError, IOError):
data = bytes(filename)
f = DataIO(data)
try:
from amoco.system import elf
# open file as a ELF object:
p = elf.Elf(f)
logger.info("ELF format detected")
return p
except elf.ElfError:
f.seek(0)
logger.debug("ElfError raised for %s" % f.name)
try:
from amoco.system import pe
# open file as a PE object:
p = pe.PE(f)
logger.info("PE format detected")
return p
except pe.PEError:
f.seek(0)
logger.debug("PEError raised for %s" % f.name)
try:
from amoco.system import macho
# open file as a Mach-O object:
p = macho.MachO(f)
logger.info("Mach-O format detected")
return p
except macho.MachOError:
f.seek(0)
logger.debug("MachOError raised for %s" % f.name)
try:
from amoco.system import utils
# open file as a HEX object:
p = utils.HEX(f)
logger.info("HEX format detected")
return p
except utils.FormatError:
f.seek(0)
logger.debug(" HEX FormatError raised for %s" % f.name)
try:
# open file as a SREC object:
p = utils.SREC(f)
logger.info("SREC format detected")
return p
except utils.FormatError:
f.seek(0)
logger.debug(" SREC FormatError raised for %s" % f.name)
logger.warning("unknown format")
return shellcode(f)
# ------------------------------------------------------------------------------
# decorator that allows to "register" all loaders on-the-fly:
[docs]class DefineLoader(object):
"""
A decorator that allows to register a system-specific loader
while it is implemented. All loaders are stored in the class global
LOADERS dict.
Example:
@DefineLoader('elf',elf.EM_386)
def loader_x86(p):
...
Here, a reference to function loader_x86 is stored in
LOADERS['elf'][elf.EM_386].
"""
LOADERS = {}
def __init__(self, fmt, name=""):
self.fmt = fmt
self.name = name
if not self.fmt in self.LOADERS:
self.LOADERS[self.fmt] = {}
if self.name in self.LOADERS[self.fmt]:
logger.warning(
"DefineLoader %s is already defined by %s"
% (self.name, self.LOADERS[self.fmt][self.name].__name__)
)
def __call__(self, loader):
logger.verbose(
"DefineLoader %s[%s]: %s" % (self.fmt, self.name, loader.__name__)
)
if self.name:
self.LOADERS[self.fmt][self.name] = loader
else:
self.LOADERS[self.fmt] = loader
return loader
[docs]def load_program(f, cpu=None):
"""
Detects program format header (ELF/PE/Mach-O/HEX/SREC),
and *maps* the program in abstract memory,
loading the associated "system" (linux/win) and "arch" (x86/arm),
based header informations.
Arguments:
f (str): the program filename or string of bytes.
Returns:
a Task, ELF/PE (old CoreExec interfaces) or RawExec instance.
"""
logger.verbose("--- define loaders ---")
from . import raw
from . import linux32
from . import linux64
from . import win32
from . import win64
from . import osx
from . import baremetal
logger.verbose("--- detect binary format ---")
p = read_program(f)
logger.verbose("--- create task ---")
Loaders = DefineLoader.LOADERS
if p.is_ELF:
try:
x = Loaders["elf"][p.Ehdr.e_machine](p)
except KeyError:
logger.error("ELF machine type not supported")
x = None
except Exception:
logger.error("ELF loader error")
x = None
elif p.is_PE:
try:
x = Loaders["pe"][p.NT.Machine](p)
except KeyError:
logger.error("PE machine type not supported")
x = None
except Exception:
logger.error("PE loader error")
x = None
elif p.is_MachO:
try:
x = Loaders["macho"][p.header.cputype](p)
except Exception:
logger.error("Mach-O machine type not supported")
x = None
except Exception:
logger.error("Mach-O loader error")
x = None
else:
x = Loaders["raw"](p, cpu)
if x is not None:
logger.info("a new task is loaded %s"%str(x.view))
else:
logger.info("no loader for this program, trying baremetal...")
if p.is_ELF:
try:
x = Loaders["elf-baremetal"][p.Ehdr.e_machine](p)
except KeyError:
logger.error("No baremetal for this ELF machine type")
x = None
except Exception:
logger.error("elf-baremetal loader error")
x = None
else:
logger.info("a new baremetal is loaded")
else:
logger.info("no baremetal for this binary format")
return x