# -*- coding: utf-8 -*-
# This code is part of Amoco
# Copyright (C) 2016 Axel Tillequin (bdcht3@gmail.com)
# published under GPLv2 license
"""
db.py
=====
This module implements all amoco's database facilities using the
`sqlalchemy`_ package, allowing to store many analysis results and
pickled objects.
.. _sqlalchemy: http://www.sqlalchemy.org/
"""
from amoco.config import conf
from amoco.logger import Log, logging
logger = Log(__name__)
logger.debug("loading module")
try:
import sqlalchemy as sql
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base
has_sql = True
Session = orm.scoped_session(orm.sessionmaker())
Base = declarative_base()
if conf.DB.log:
for l in ("sqlalchemy.engine", "sqlalchemy.orm"):
alog = logging.getLogger(l)
for h in logger.handlers:
alog.addHandler(h)
except ImportError:
logger.warning("package sqlalchemy not found.")
has_sql = False
[docs]def createdb(url=None):
"""creates the database engine and bind it to the scoped Session class.
The database URL (see :mod:`config.py`) is opened and the
schema is created if necessary. The default URL uses *sqlite* dialect and
opens a temporary file for storage.
"""
if url is None:
url = conf.DB.url
if has_sql:
logflag = conf.DB.log
engine = sql.create_engine(
url,
echo=logflag,
echo_pool=logflag,
logging_name=__name__
)
Session.configure(bind=engine)
Case.metadata.create_all(bind=engine)
logger.info("SQL Session available")
else:
logger.info("no Session defined")
engine = None
return engine
if has_sql:
class Case(Base):
"""
A Case instance describes the analysis of some binary program.
It allows to query stored results by date, source, format or
architecture for example, and provides relations to associated
functions that have been discovered or saved traces.
A Case is initialized from an analysis instance.
"""
__tablename__ = "cases_info"
id = sql.Column(sql.Integer, primary_key=True)
date = sql.Column(sql.DateTime)
name = sql.Column(sql.String)
source = sql.Column(sql.String)
binfmt = sql.Column(sql.String)
arch = sql.Column(sql.String)
msize = sql.Column(sql.Integer)
score = sql.Column(sql.Integer, default=0)
method = sql.Column(sql.String)
funcs = orm.relationship("FuncData", back_populates="case")
other = orm.relationship("CfgData", back_populates="case")
traces = orm.relationship("Trace", back_populates="case")
def __init__(self, z, name=None):
from datetime import datetime
from os.path import basename, splitext
self.date = datetime.now()
self.source = z.prog.bin.filename
self.name = name or splitext(basename(self.source))[0]
self.binfmt = z.prog.bin.__class__.__name__
self.arch = z.prog.cpu.__name__
self.msize = len(z.prog.mmap)
self.method = z.__class__.__name__
if z.G.order() > 0:
self.score = z.score()
F = z.functions
for f in F:
self.funcs.append(FuncData(f))
F = [f.cfg for f in F]
for g in z.G.C:
if g not in F:
self.other.append(CfgData(obj=g))
def __repr__(self):
s = (self.id, self.name, self.binfmt, self.arch, self.method)
return "<Case #{}: {:<.016} ({},{}) using {}>".format(*s)
class FuncData(Base):
"""This class holds pickled :class:`~cas.mapper.mapper` and
:class:`code.func` instances related to a Case, and provides
relationship with gathered infos about the discovered function.
"""
__tablename__ = "funcs_data"
id = sql.Column(sql.Integer, primary_key=True)
fmap = orm.deferred(sql.Column(sql.PickleType))
obj = orm.deferred(sql.Column(sql.PickleType))
case_id = sql.Column(sql.Integer, sql.ForeignKey("cases_info.id"))
case = orm.relationship("Case", back_populates="funcs")
info = orm.relationship("FuncInfo", uselist=False, back_populates="data")
def __init__(self, f):
self.fmap = f.map
self.obj = f
self.info = FuncInfo(f, self)
def __repr__(self):
s = (self.id, self.info.name, self.case_id)
return "<FuncData #{}: {:<.016} of Case #{}>".format(*s)
class FuncInfo(Base):
"""This class gathers useful informations about a function, allowing
to query by signature or various characteristics like number of blocks,
number of args, stack size, byte size, number of instructions, calls,
or cross-references.
"""
__tablename__ = "funcs_info"
id = sql.Column(sql.Integer, sql.ForeignKey("funcs_data.id"), primary_key=True)
name = sql.Column(sql.String, nullable=False)
sig = sql.Column(sql.String)
blocks = sql.Column(sql.Integer)
argsin = sql.Column(sql.Integer)
argsout = sql.Column(sql.Integer)
stksz = sql.Column(sql.Integer)
vaddr = sql.Column(sql.Integer)
bsize = sql.Column(sql.Integer)
nbinst = sql.Column(sql.Integer)
calls = sql.Column(sql.String)
xrefs = sql.Column(sql.String)
data = orm.relationship("FuncData", uselist=False, back_populates="info")
notes = orm.deferred(sql.Column(sql.Text, default=""))
def __init__(self, f, data):
from amoco.cfg import signature
self.name = f.name
self.sig = signature(f.cfg)
self.blocks = f.cfg.order()
self.argsin = f.misc["func_in"]
self.argsout = f.misc["func_out"]
self.stksz = min([0] + [x.a.disp for x in f.misc["func_var"]])
self.vaddr = str(f.address)
self.bsize = sum([b.length for b in f.blocks], 0)
self.nbinst = sum([len(b.instr) for b in f.blocks], 0)
self.calls = " ".join(
filter(None, [x.name if hasattr(x, "cfg") else None for x in f.blocks])
)
self.xrefs = " ".join(
[str(x.data.support[1]) for x in f.cfg.sV[0].data.misc["callers"]]
)
def __repr__(self):
s = (self.id, self.name, self.data.id)
return "<FuncInfo #{}: {:<.016} with FuncData #{}>".format(*s)
class CfgData(Base):
"""The CfgData class is intented to pickle data that has not yet been
identified as a function but is part of the recovered :class:graph.
"""
__tablename__ = "cfgs_data"
id = sql.Column(sql.Integer, primary_key=True)
obj = orm.deferred(sql.Column(sql.PickleType))
case_id = sql.Column(sql.Integer, sql.ForeignKey("cases_info.id"))
case = orm.relationship("Case", back_populates="other")
def __repr__(self):
s = (self.id, self.case_id)
return "<CfgData #{}: object related to Case #{}>".format(*s)
class Trace(Base):
"""The Trace class allows to pickle abstract memory states (:class:`mapper` objects)
obtained from a given input map after executing the binary program from *start* address
to *stop* address.
"""
__tablename__ = "traces_data"
id = sql.Column(sql.Integer, primary_key=True)
start = sql.Column(sql.Integer)
stop = sql.Column(sql.Integer)
mapin = orm.deferred(sql.Column(sql.PickleType))
mapout = orm.deferred(sql.Column(sql.PickleType))
case_id = sql.Column(sql.Integer, sql.ForeignKey("cases_info.id"))
case = orm.relationship("Case", back_populates="traces")
def __repr__(self):
s = (self.id, self.start, self.stop, self.case_id)
return "<Trace #{}: ({}->{}) in Case #{}>".format(*s)
class StructData(Base):
"""This class holds description of C structures in a format suitable
for StructFactory(name,fmt).
"""
__tablename__ = "structs_data"
id = sql.Column(sql.Integer, primary_key=True)
name = sql.Column(sql.String, nullable=False)
sz = sql.Column(sql.Integer)
fields = sql.Column(sql.String)
types = sql.Column(sql.String)
def __init__(self, S):
self.name = S[0]
self.sz = len(S[1])
self.fields = "\n".join(y for (x, y) in S[1])
self.types = "\n".join(x for (x, y) in S[1])
def __repr__(self):
s = (self.id, self.name)
return "<StructData #{}: {:<.016}>".format(*s)
createdb()
session = Session()