Source code for amoco.cas.mapper

# -*- coding: utf-8 -*-

# This code is part of Amoco
# Copyright (C) 2006-2011 Axel Tillequin (bdcht3@gmail.com)
# published under GPLv2 license

"""
cas/mapper.py
=============

The mapper module essentially implements the :class:`mapper` class
and the associated :func:`merge` function which allows to get a
symbolic representation of the *union* of two mappers.
"""

from amoco.logger import Log

logger = Log(__name__)
logger.debug("loading module")

from .expressions import *

from amoco.cas.tracker import generation
from amoco.system.memory import MemoryMap
from amoco.arch.core import Bits
from amoco.ui.views import mapperView


[docs]class mapper(object): """A mapper is a symbolic functional representation of the execution of a set of instructions. Args: instrlist (list[instruction]): a list of instructions that are symbolically executed within the mapper. csi (Optional[object]): the optional csi attribute that provide a *concrete* initial state Attributes: __map : is an ordered list of mappings of expressions associated with a location (register or memory pointer). The order is relevant only to reflect the order of write-to-memory instructions in case of pointer aliasing. __Mem : is a memory model where symbolic memory pointers are addressing separated memory zones. See MemoryMap and MemoryZone classes. conds : is the list of conditions that must be True for the mapper csi : is the optional interface to a *concrete* state """ __slots__ = ["__map", "__Mem", "conds", "csi", "view"] def __init__(self, instrlist=None, csi=None): self.__map = generation() self.__map.lastw = 0 self.__map.delayed = None self.__Mem = MemoryMap() self.conds = [] self.csi = csi icache = [] # if the __map needs to be inited before executing instructions # one solution is to prepend the instrlist with a function dedicated # to this init phase... for instr in instrlist or []: # call the instruction with this mapper: instr(self) self.view = mapperView(self) def __len__(self): return len(self.__map) def __str__(self): return "\n".join(["%s <- %s" % x for x in self])
[docs] def inputs(self): "list antecedent locations (used in the mapping)" r = [] for l, v in iter(self.__map.items()): if (l==v): continue for lv in locations_of(v): if lv._is_reg and l._is_reg: if (lv.etype & l.etype & regtype.FLAGS): continue r.append(lv) return r
[docs] def outputs(self): "list image locations (modified in the mapping)" L = [] for l in sum([locations_of(e) for e in self.__map], []): if l._is_reg and (l.etype & (regtype.PC | regtype.FLAGS)): continue if l._is_ptr: l = mem(l, self.__map[l].size) if self[l] == l: continue L.append(l) return L
[docs] def has(self, loc): "check if the given location expression is touched by the mapper" for l in self.__map.keys(): if loc == l: return True return False
[docs] def history(self, loc): k, v = self.__map.hist if k==loc: return v else: return self[k]
[docs] def delayed(self, k ,v): self.__map.delayed = (k,v)
[docs] def update_delayed(self): kv = self.__map.delayed if kv is not None: self.__map.delayed = None self.__setitem__(*kv)
[docs] def rw(self): "get the read sizes and written sizes tuple" r = filter(lambda x: x._is_mem, self.inputs()) w = filter(lambda x: x._is_mem, self.outputs()) sr = [x.size for x in r] sw = [x.size for x in w] return (sr, sw)
[docs] def clear(self): "clear the current mapper, reducing it to the identity transform" self.__map.clear() self.__Mem = MemoryMap() self.conds = []
[docs] def getmemory(self): "get the local :class:`MemoryMap` associated to the mapper" return self.__Mem
[docs] def setmemory(self, mmap): "set the local :class:`MemoryMap` associated to the mapper" self.__Mem = mmap
mmap = property(getmemory, setmemory)
[docs] def generation(self): return self.__map
def __cmp__(self, m): d = cmp(self.__map.lastdict(), m.__map.lastdict()) return d def __eq__(self, m): d = self.__map.lastdict() == m.__map.lastdict() return d # iterate over ordered correspondances: def __iter__(self): for (loc, v) in iter(self.__map.items()): yield (loc, v)
[docs] def R(self, x): "get the expression of register x" if self.csi: return self.__map.get(x, self.csi(x)) else: return self.__map.get(x, x)
[docs] def M(self, k): """get the expression of a memory location expression k""" if k.a.base._is_lab: return k if k.a.base._is_ext: return k.a.base n = self.aliasing(k) if n > 0: f = lambda e: e[0]._is_ptr items = filter(f, list(self.__map.items())[0:n]) res = mem(k.a, k.size, mods=list(items), endian=k.endian) else: res = self._Mem_read(k.a, k.length, k.endian) res.sf = k.sf return res
[docs] def aliasing(self, k): """check if location k is possibly aliased in the mapper: i.e. the mapper writes to some other symbolic location expression after writing to k which might overlap with k.""" if conf.Cas.noaliasing: return 0 K = list(self.__map.keys()) n = self.__map.lastw try: i = K.index(k.a) except ValueError: # k has never been written to explicitly # but it is maybe in a zone that was written to i = -1 for l in K[i + 1 : n]: if not l._is_ptr: continue if l.base == k.a.base: continue return n return 0
def _Mem_read(self, a, l, endian=1): "read l bytes from memory address a and return an expression" try: res = self.__Mem.read(a, l) except MemoryError: # no zone for location a; res = [exp(l * 8)] if endian == -1: res.reverse() P = [] cur = 0 for p in res: plen = len(p) if isinstance(p, exp) and (p._is_def == 0): # p is "bottom": if self.csi: p = self.csi(mem(a, p.size, disp=cur, endian=endian)) else: p = mem(a, p.size, disp=cur) if isinstance(p, bytes): p = cst(Bits(p[::endian], bitorder=1).int(), plen * 8) P.append(p) cur += plen return composer(P) def _Mem_write(self, a, v, endian=1): "write expression v at memory address a with given endianness" if a.base._is_vec: locs = (ptr(l, a.seg, a.disp) for l in a.base.l) else: locs = (a,) for l in locs: self.__Mem.write(l, v, endian) if l in self.__map: del self.__map[l] def __getitem__(self, k): "just a convenient wrapper around M/R" r = self.M(k) if k._is_mem else self.R(k) if k.size != r.size: raise ValueError("size mismatch") return r[0 : k.size] # define image v of antecedent k: def __setitem__(self, k, v): if k._is_ptr: loc = k else: if k.size != v.size: raise ValueError("size mismatch") try: loc = k.addr(self) except TypeError: logger.error("setitem ignored (invalid left-value expression: %s)" % k) return # now loc is either a reg or a ptr, we prepare the right-value r from v: if k._is_slc and not loc._is_reg: raise ValueError("memory location slc is not supported") elif loc._is_ptr: r = v oldr = self.__map.get(loc, None) if oldr is not None and oldr.size > r.size: r = composer([r, oldr[r.size : oldr.size]]) if k._is_mem: endian = k.endian else: endian = 1 self._Mem_write(loc, r, endian) if conf.Cas.memtrace or not conf.Cas.noaliasing: # if we assume that aliasing may exists, we # need to keep tracks of the memory writes ordering # in the mapper: self.__map.lastw = len(self.__map) + 1 #this is O(1) AFAIK... self.__map[loc] = r else: r = self.R(loc) if r._is_reg: r = comp(loc.size) r[0 : loc.size] = loc pos = k.pos if k._is_slc else 0 r[pos : pos + k.size] = v.simplify() self.__map[loc] = r
[docs] def update(self, instr): "opportunistic update of the self mapper with instruction" instr(self)
[docs] def safe_update(self, instr): "update of the self mapper with instruction *only* if no exception occurs" if not isinstance(instr,ext): try: m = mapper() instr(m) _ = self >> m except Exception as e: logger.error("instruction @ %s raises exception %s" % (instr.address, e)) raise e self.update(instr)
def __call__(self, x): """evaluation of expression x in this map: note the difference between a mapper[mem(p)] and mapper(mem(p)): in the call form, p is first evaluated so that the target address is the expression of p "after execution" whereas the indexing form uses p as an input (i.e "before execution") expression. """ if len(self) == 0: return x return x.eval(self)
[docs] def restruct(self): self.__Mem.restruct()
[docs] def eval(self, m): """return a new mapper instance where all input locations have been replaced by there corresponding values in m. """ mm = mapper(csi=self.csi) mm.setmemory(self.mmap.copy()) for c in self.conds: cc = c.eval(m) if not cc._is_def: continue if cc == 1: continue if cc == 0: logger.verbose("invalid mapper eval: cond %s is false" % c) raise ValueError mm.conds.append(cc) for loc, v in self: if loc._is_ptr: loc = m(loc) mm[loc] = m(v) return mm
[docs] def rcompose(self, m): """composition operator returns a new mapper corresponding to function x -> self(m(x)) """ mm = m.use() for c in self.conds: cc = c.eval(m) if not cc._is_def: continue if cc == 1: continue if cc == 0: logger.verbose("invalid mapper eval: cond %s is false" % c) raise ValueError mm.conds.append(cc) for loc, v in self: if loc._is_ptr: loc = m(loc) mm[loc] = m(v) return mm
def __lshift__(self, m): "self << m : composition (self(m))" return self.rcompose(m) def __rshift__(self, m): "self >> m : composition (m(self))" return m.rcompose(self)
[docs] def interact(self): raise NotImplementedError
[docs] def use(self, *args, **kargs): """return a new mapper corresponding to the evaluation of the current mapper where all key symbols found in kargs are replaced by their values in all expressions. The kargs "size=value" allows for adjusting symbols/values sizes for all arguments. if kargs is empty, a copy of the result is just a copy of current mapper. """ m = mapper(csi=self.csi) for loc, v in args: m[loc] = v if len(kargs) > 0: argsz = kargs.get("size", 32) for k, v in iter(kargs.items()): m[reg(k, argsz)] = cst(v, argsz) return self.eval(m)
[docs] def usemmap(self, mmap): """return a new mapper corresponding to the evaluation of the current mapper where all memory locations of the provided mmap are used by the current mapper.""" m = mapper() m.setmemory(mmap) for xx in set(self.inputs()): if xx._is_mem: v = m.M(xx) m[xx] = v return self << m
# attach/apply conditions to the output mapper
[docs] def assume(self, conds): m = mapper(csi=self.csi) if conds is None: conds = [] for c in conds: if not c._is_eqn: continue if c.op.symbol == OP_EQ and c.r._is_cst: if c.l._is_reg: m[c.l] = c.r m.conds = conds mm = self.eval(m) mm.conds += conds return mm
from amoco.cas.smt import * def merge(m1, m2, **kargs): "union of two mappers" m1 = m1.assume(m1.conds) m2 = m2.assume(m2.conds) mm = mapper() # "import" m2 values into m1 locations: for loc, v1 in m1: if loc._is_ptr: seg = loc.seg disp = loc.disp if loc.base._is_vec: v2 = vec([m2[mem(l, v1.size, seg, disp)] for l in loc.base.l]) v2 = v2.simplify(**kargs) else: v2 = m2[mem(loc, v1.size)] else: if loc._is_reg and (loc.etype & regtype.FLAGS): v2 = top(loc.size) else: v2 = m2[loc] v1 = v1.simplify(**kargs) v2 = v2.simplify(**kargs) vv = vec([v1, v2]).simplify(**kargs) mm[loc] = vv # "import" m1 values into m2 locations: for loc, v2 in m2: if mm.has(loc): continue if loc._is_ptr: seg = loc.seg disp = loc.disp if loc.base._is_vec: v1 = vec([m1[mem(l, v2.size, seg, disp)] for l in loc.base.l]) v1 = v1.simplify(**kargs) else: v1 = m1[mem(loc, v2.size)] else: if loc._is_reg and (loc.etype & regtype.FLAGS): v1 = top(loc.size) else: v1 = m1[loc] v2 = v2.simplify(**kargs) v1 = v1.simplify(**kargs) vv = vec([v1, v2]).simplify(**kargs) mm[loc] = vv return mm