# -*- coding: utf-8 -*-
# This code is part of Amoco
# Copyright (C) 2007-2019 Axel Tillequin (bdcht3@gmail.com)
# published under GPLv2 license
"""
system/memory.py
================
This module defines all Memory related classes.
The main class of amoco's Memory model is :class:`MemoryMap`.
It provides a way to represent both concrete and abstract symbolic values
located in the virtual memory space of a process.
In order to allow addresses to be symbolic as well, the MemoryMap is
organised as a collection of :class:`MemoryZone`.
A zone holds values located at addresses that are integer offsets
related to a symbolic expression. A default zone with related address set
to ``None`` holds values at concrete (virtual) addresses in every MemoryMap.
"""
from amoco.logger import Log
logger = Log(__name__)
logger.debug("loading module")
from bisect import bisect_left
from amoco.cas.expressions import exp
from amoco.ui.views import mmapView
# ------------------------------------------------------------------------------
[docs]class MemoryMap(object):
"""Provides a way to represent concrete and abstract symbolic values
located in the virtual memory space of a process.
A MemoryMap is organised as a collection of :class:`MemoryZone`.
Attributes:
_zones : dictionary of zones, keys are the related address expressions.
Methods:
newzone(label): creates a new memory zone with the given label related
expression.
locate(address): returns the memory object that maps the provided
address expression.
reference(address): returns a couple (rel,offset) based on the given
address, an integer, a string or an expression allowing to find
a candidate zone within memory.
read(address,l): reads l bytes at address. returns a list of
datadiv values.
write(address,expr,endian=1): writes given expression at
given (possibly symbolic) address. Default endianness is 'little'.
Use endian=-1 to indicate big endian convention.
restruct(): optimize all zones to merge contiguous raw bytes into single
mo objects.
grep(pattern): find all occurences of the given regular expression in
the raw bytes objects of all memory zones.
merge(other): update this MemoryMap with a new MemoryMap, merging
overlapping zones with values from the new map.
"""
__slots__ = ["_zones", "misc", "view"]
def __init__(self):
self._zones = {None: MemoryZone()}
self.misc = {}
self.view = mmapView(self)
[docs] def newzone(self, label):
z = MemoryZone()
z.rel = label
self._zones[label] = z
return z
[docs] def reference(self, address):
if isinstance(address, int):
return (None, address)
elif address._is_ext:
return (address, 0)
elif address._is_cst:
return (None,address.v)
elif address._is_ptr:
r, a = (address.base, address.disp)
if r._is_cst:
return (None, (r + a).v)
else:
return (r, a)
else:
raise MemoryError(address)
def __len__(self):
sta, sto = self._zones[None].range()
return sto - sta
def __str__(self):
return "\n".join([str(z) for z in self._zones.values()])
[docs] def read(self, address, l):
r, o = self.reference(address)
if r in self._zones:
return self._zones[r].read(o, l)
else:
raise MemoryError(address)
[docs] def write(self, address, expr, endian=1):
r, o = self.reference(address)
if r is not None and not r._is_def:
# write to undefined (top):
raise MemoryError(address)
if not r in self._zones:
z = self.newzone(r)
else:
z = self._zones[r]
z.write(o, expr, endian)
def __getitem__(self, i):
sta, sto = self._zones[None].range()
address, sto, _ = i.indices(sto)
res = self.read(address, sto - address)
try:
return b"".join(res)
except Exception:
return res
[docs] def restruct(self):
for z in iter(self._zones.values()):
z.restruct()
[docs] def grep(self, pattern):
res = []
for z in iter(self._zones.values()):
zres = z.grep(pattern)
if z.rel is not None:
zres = [z.rel + r for r in zres]
res.extend(zres)
return res
def copy(self):
mm = self.__class__()
for k, z in self._zones.items():
mm._zones[k] = z.copy()
return mm
[docs] def merge(self, other):
for r, z in other._zones.items():
if r in self._zones:
for o in z._map:
self._zones[r].addtomap(o)
else:
self._zones[r] = z
# ------------------------------------------------------------------------------
[docs]class MemoryZone(object):
"""A MemoryZone contains mo objects at addresses that are integer offsets
related to a symbolic expression. A default zone with related address set
to None holds values at concrete addresses in every :class:`MemoryMap`.
Args:
rel (exp): the relative symbolic expression, defaults to None.
Attributes:
rel : the relative symbolic expression, or None.
_map : the ordered list of mo objects of this zone.
Methods:
range(): returns the lowest and highest addresses currently used by
mo objects of this zone.
locate(vaddr): if the given address is within range, return the
index of the corresponding mo object in _map, otherwise
return None.
read(vaddr,l): reads l bytes starting at vaddr. returns a list of
datadiv values, unmapped areas are returned as *bottom* exp.
write(vaddr,data): writes data expression or
bytes at given (offset) address.
addtomap(z): add (possibly overlapping) :class:`mo` object z to the
_map, eventually adjusting other objects.
restruct(): optimize the zone to merge contiguous raw bytes into single
mo objects.
shift(offset): shift all mo objects by a given offset.
grep(pattern): find all occurences of the given regular expression in
the raw bytes objects of the zone.
"""
__slots__ = ["rel", "_map", "__cache", "__hist"]
def __init__(self, rel=None):
self.rel = rel
self._map = []
self.__cache = [] # speedup locate method
self.__hist = []
[docs] def range(self):
try:
return (self._map[0].vaddr, self._map[-1].end)
except IndexError:
return (0, 0)
def __str__(self):
l = ["<MemoryZone rel=%s :" % str(self.rel)]
for z in self._map:
l.append("\t %s" % str(z))
return "\n".join(l) + ">"
def __update_cache(self):
self.__cache = [z.vaddr for z in self._map]
def copy(self):
z = MemoryZone(self.rel)
z._map = [o.copy() for o in self._map]
z.restruct()
return z
[docs] def locate(self, vaddr):
p = self.__cache
if vaddr in p:
return p.index(vaddr)
i = bisect_left(p, vaddr)
if i == 0:
return None
else:
return i - 1
[docs] def read(self, vaddr, l):
void = exp
res = []
i = self.locate(vaddr)
if i is None:
if len(self._map) == 0:
return [void(l * 8)]
v0 = self._map[0].vaddr
# Don't test if (vaddr+l)<=v0 because we need the test to be
# true if vaddr or v0 contain label/symbols
if not (v0 < (vaddr + l)):
return [void(l * 8)]
res.append(void((v0 - vaddr) * 8))
l = (vaddr + l) - v0
vaddr = v0
i = 0
ll = l
while ll > 0:
try:
data, ll = self._map[i].read(vaddr, ll)
except IndexError:
res.append(void(ll * 8))
ll = 0
break
if data is None:
vi = self.__cache[i]
if vaddr < vi:
l = min(vaddr + ll, vi) - vaddr
data = void(l * 8)
ll -= l
i -= 1
if data is not None:
vaddr += len(data)
res.append(data)
i += 1
assert ll == 0
return res
def read_history(self, vaddr, l):
H = []
z = MemoryZone(self.rel)
for h in self.__hist:
z._map = h
z.restruct()
H.append(z.read(vaddr, l))
return H
[docs] def write(self, vaddr, data, endian=1):
self.addtomap(mo(vaddr, data, endian))
[docs] def addtomap(self, z):
i = self.locate(z.vaddr)
j = self.locate(z.end)
# h = []
if j is None:
assert i is None or i==0
self._map.insert(0, z)
self.__update_cache()
return
if j == i:
ii = self._map[i].copy()
ii.trim(z.vaddr)
# h.insert(0,ii)
Z = self._map[i].write(z.vaddr, z.data.val, z.data.endian)
i += 1
for newz in Z:
self._map.insert(i, newz)
i += 1
self.__update_cache()
return
# i!=j cases:
if i is not None:
assert j >= i
# delete & update every overwritten zones
# by adjusting [i,j]:
if z.end in self._map[j]:
jj = self._map[j].copy()
jj.setlen(z.end - z.vaddr)
# h.insert(0,jj)
self._map[j].trim(z.end)
else:
j += 1
Z = [z]
if i is None:
i = -1
elif z.vaddr <= self._map[i].end:
ii = self._map[i].copy()
ii.trim(z.vaddr)
# h.insert(0,ii)
# overright data:
Z = self._map[i].write(z.vaddr, z.data.val, z.data.endian)
i += 1
# h = self._map[i:j]+h
del self._map[i:j]
# insert new zones:
for newz in Z:
self._map.insert(i, newz)
i += 1
# if len(h)>0: self.__hist.insert(0,h)
self.__update_cache()
[docs] def restruct(self):
if len(self._map) == 0:
return
m = [self._map.pop(0)]
for z in self._map:
rawtype = z.data._is_raw & m[-1].data._is_raw
if rawtype and (z.vaddr == m[-1].end):
try:
m[-1].data.val += z.data.val
except TypeError:
m.append(z)
else:
m.append(z)
self._map = m
self.__update_cache()
[docs] def shift(self, offset):
for z in self._map:
z.vaddr += offset
self.__update_cache()
[docs] def grep(self, pattern):
import re
g = re.compile(pattern)
res = []
for z in self._map:
if z.data._is_raw:
off = 0
for s in g.findall(z.data.val):
off = z.data.val.index(s, off)
res.append(z.vaddr + off)
off += len(s)
return res
def is_raw(self):
return all((m.data._is_raw for m in self._map))
def dump(self,start=0,stop=0):
r = self.range()
if start < r[0]:
start = r[0]
if stop == 0 or stop > r[1]:
stop = r[1]
dump = []
for p in self.read(start,stop-start):
if hasattr(p,'etype'):
p = b'\0'*(p.length)
dump.append(p)
return b''.join(dump)
# ------------------------------------------------------------------------------
[docs]class mo(object):
"""A mo object essentially associates a datadiv with a memory offset, and
provides methods to detect if an address is located within this object,
to read or write bytes at a given address. The offset is relative to the
start of the :class:`MemoryZone` in which the mo object is stored.
Attributes:
vaddr : a python integer that represents the offset within the memory
zone that contains this memory object (mo).
data : the datadiv object located at this offset.
Methods:
trim(vaddr): if this mo contains data at given offset, cut out this
data and points current object to this offset. Note that a trim is
generally the result of data being overwritten by another mo.
read(vaddr,l): returns the list of datadiv objects at given offset so
that the total length is at most l, and the number of bytes missing
if the total length is less than l.
write(vaddr,data): updates current mo to reflect the writing of data at
given offset and returns the list of possibly new mo objects to be
inserted in the zone.
"""
__slots__ = ["vaddr", "data"]
def __init__(self, vaddr, data, endian=1):
self.vaddr = vaddr
self.data = datadiv(data, endian)
@property
def end(self):
return self.vaddr + len(self.data)
def __contains__(self, vaddr):
return self.vaddr <= vaddr < self.end
def __repr__(self):
data = str(self.data)
if len(data) > 32:
data = data[:32] + "..."
if self.data._is_raw:
data += "'"
return "<mo [%08x,%08x] data:%s>" % (self.vaddr, self.end, data)
[docs] def trim(self, vaddr):
if vaddr in self:
l = vaddr - self.vaddr
if l > 0:
self.data.cut(l)
self.vaddr = vaddr
def setlen(self, l):
self.data.setlen(l)
[docs] def read(self, vaddr, l):
if vaddr in self:
return self.data.getpart(vaddr - self.vaddr, l)
else:
return (None, l)
[docs] def write(self, vaddr, data, endian):
if vaddr in self or vaddr == self.end:
parts = self.data.setpart(vaddr - self.vaddr, data, endian)
self.data = parts[0]
O = []
vaddr = self.end
for p in parts[1:]:
O.append(mo(vaddr, p.val, p.endian))
vaddr += len(p)
return O
else:
return [mo(vaddr, data, endian)]
def copy(self):
return mo(self.vaddr, self.data.val, self.data.endian)
# ------------------------------------------------------------------------------
[docs]class datadiv(object):
"""
A datadiv represents any data within memory, including symbolic expressions.
Args:
data : either a string of bytes or an amoco expression.
endian : either [-1,1], used when data is any symbolic expression.
1 is for little-endian, -1 for big-endian.
Attributes:
val : the reference to the data object.
_is_raw : a flag indicating that the data object is a string of bytes.
Methods:
cut(l): cut out the first l bytes of the current data, keeping only
the remaining part of the data.
setlen(l): cut out trailing bytes of the current data, keeping only
the first l bytes.
getpart(o,l): returns a pair (result, counter) where result is a part
of data of length at most l located at offset o (relative to the
beginning of the data bytes), and counter is the number of bytes
missing (l-len(result)) if the current data length is less than l.
setpart(o,data): returns a list of contiguous datadiv objects that
correspond to overwriting self with data at offset o (possibly
extending the current datadiv length).
"""
__slots__ = ["val", "endian"]
def __init__(self, data, endian):
self.val = data
self.endian = endian
if not self._is_raw:
if self.val._is_cst:
self.val = self.val.to_bytes(endian)
@property
def _is_raw(self):
return not hasattr(self.val, "etype")
def __len__(self):
return len(self.val)
def __repr__(self):
s = repr(self.val)
if len(s) > 32:
s = s[:32] + "..."
if isinstance(self.val, bytes):
s += "'"
return "<datadiv:%s>" % s
def __str__(self):
return repr(self.val) if self._is_raw else str(self.val)
[docs] def cut(self, l):
if self._is_raw:
self.val = self.val[l:]
else:
self.val = self.val.bytes(sta=l, endian=self.endian)
[docs] def setlen(self, l):
if self._is_raw:
self.val = self.val[:l]
else:
self.val = self.val.bytes(sto=l, endian=self.endian)
[docs] def getpart(self, o, l):
try:
assert o >= 0 and l >= 0
if not self._is_raw:
s = self.val.size
assert s % 8 == 0
except AssertionError:
logger.error("invalid fetch (o=%s,l=%s) in %s" % (o, l, repr(self)))
raise ValueError
lv = len(self)
if o == 0 and l == lv:
return (self.val, 0)
if self._is_raw:
res = self.val[o : o + l]
return (res, l - len(res))
if o >= lv:
return (None, l)
res = self.val.bytes(o, o + l, self.endian)
return (res, l - res.length)
[docs] def setpart(self, o, data, endian):
assert 0 <= o <= len(self)
P = [datadiv(data, endian)]
olv = o + len(data)
endl = len(self) - olv
if endl > 0:
P.append(datadiv(self.getpart(olv, endl)[0], self.endian))
if o > 0:
P.insert(0, datadiv(self.getpart(0, o)[0], self.endian))
# now merge contiguous parts if they have same type:
return mergeparts(P)
# ------------------------------------------------------------------------------
[docs]def mergeparts(P):
"""This function will detect every contiguous raw datadiv objects in the
input list P, and will return a new list where these objects have been
merged into a single raw datadiv object.
Args:
P (list): input list of datadiv objects.
Returns:
list: the list after raw datadiv objects have been merged.
"""
parts = [P.pop(0)]
while len(P) > 0:
p = P.pop(0)
if parts[-1]._is_raw and p._is_raw:
try:
parts[-1].val += p.val
except TypeError:
parts.append(p)
else:
parts.append(p)
return parts