angr.engines.vex.heavy.heavy 源代码

from __future__ import annotations
import logging
import claripy
import pyvex

from angr.engines.engine import SuccessorsMixin
from angr.engines.vex.light import VEXMixin
from angr.engines.vex.lifter import VEXLifter
from angr.engines.vex.claripy.datalayer import ClaripyDataMixin, symbol
from angr.utils.constants import DEFAULT_STATEMENT
from angr import sim_options as o
from angr import errors
from . import dirty

l = logging.getLogger(__name__)


class VEXEarlyExit(Exception):
    # pylint:disable=missing-class-docstring
    pass


class SimStateStorageMixin(VEXMixin):
    # pylint:disable=arguments-differ,missing-class-docstring
    def _perform_vex_expr_Get(self, offset, ty, action=None, inspect=True):
        return self.state.registers.load(offset, self._ty_to_bytes(ty), action=action, inspect=inspect)

    def _perform_vex_expr_RdTmp(self, tmp):
        return self.state.scratch.tmp_expr(tmp)

    def _perform_vex_expr_Load(self, addr, ty, endness, action=None, inspect=True, condition=None, **kwargs):
        return self.state.memory.load(
            addr, self._ty_to_bytes(ty), endness=endness, action=action, inspect=inspect, condition=condition
        )

    def _perform_vex_stmt_Put(self, offset, data, action=None, inspect=True):
        self.state.registers.store(offset, data, action=action, inspect=inspect)

    def _perform_vex_stmt_Store(self, addr, data, endness, action=None, inspect=True, condition=None):
        if (
            o.UNICORN_HANDLE_SYMBOLIC_ADDRESSES in self.state.options
            or o.UNICORN_HANDLE_SYMBOLIC_CONDITIONS in self.state.options
        ) and data.symbolic:
            # Update the concrete memory value before updating symbolic value so that correct values are mapped into
            # native interface
            concrete_data = claripy.BVV(self.state.solver.eval(data), data.size())
            self.state.memory.store(
                addr, concrete_data, endness=endness, action=None, inspect=False, condition=condition
            )

        self.state.memory.store(
            addr, data, size=data.size() // 8, endness=endness, action=action, inspect=inspect, condition=condition
        )

    def _perform_vex_stmt_WrTmp(self, tmp, data, deps=None):
        self.state.scratch.store_tmp(tmp, data, deps=deps)


# pylint:disable=arguments-differ
[文档] class HeavyVEXMixin(SuccessorsMixin, ClaripyDataMixin, SimStateStorageMixin, VEXMixin, VEXLifter): """ Execution engine based on VEX, Valgrind's IR. Responds to the following parameters to the step stack: - irsb: The PyVEX IRSB object to use for execution. If not provided one will be lifted. - skip_stmts: The number of statements to skip in processing - last_stmt: Do not execute any statements after this statement - whitelist: Only execute statements in this set - thumb: Whether the block should be force to be lifted in ARM's THUMB mode. - extra_stop_points: An extra set of points at which to break basic blocks - opt_level: The VEX optimization level to use. - insn_bytes: A string of bytes to use for the block instead of the project. - size: The maximum size of the block, in bytes. - num_inst: The maximum number of instructions. - traceflags: traceflags to be passed to VEX. (default: 0) """ # entry point
[文档] def process_successors( self, successors, irsb=None, insn_text=None, insn_bytes=None, thumb=False, size=None, num_inst=None, extra_stop_points=None, opt_level=None, strict_block_end=None, **kwargs, ): if not pyvex.lifting.lifters[self.state.arch.name] or type(successors.addr) is not int: return super().process_successors( successors, extra_stop_points=extra_stop_points, num_inst=num_inst, size=size, insn_text=insn_text, insn_bytes=insn_bytes, **kwargs, ) if insn_text is not None: if insn_bytes is not None: raise errors.SimEngineError("You cannot provide both 'insn_bytes' and 'insn_text'!") insn_bytes = self.project.arch.asm(insn_text, addr=successors.addr, thumb=thumb) if insn_bytes is None: raise errors.AngrAssemblyError( "Assembling failed. Please make sure keystone is installed, and the assembly string is correct." ) successors.sort = "IRSB" successors.description = "IRSB" self.state.history.recent_block_count = 1 self.state.scratch.guard = claripy.true() self.state.scratch.sim_procedure = None addr = successors.addr self.state.scratch.bbl_addr = addr while True: # check permissions, are we allowed to execute here? Do we care? if o.STRICT_PAGE_ACCESS in self.state.options: try: perms = self.state.memory.permissions(addr) except errors.SimMemoryError as sim_mem_err: raise errors.SimSegfaultError(addr, "exec-miss") from sim_mem_err else: if not self.state.solver.symbolic(perms): perms = self.state.solver.eval(perms) if not perms & 4 and o.ENABLE_NX in self.state.options: raise errors.SimSegfaultError(addr, "non-executable") if irsb is None: irsb = self.lift_vex( addr=addr, state=self.state, insn_bytes=insn_bytes, thumb=thumb, size=size, num_inst=num_inst, extra_stop_points=extra_stop_points, opt_level=opt_level, strict_block_end=strict_block_end, ) if ( irsb.jumpkind == "Ijk_NoDecode" and irsb.next.tag == "Iex_Const" and irsb.next.con.value == irsb.addr and not self.state.project.is_hooked(irsb.addr) ): raise errors.SimIRSBNoDecodeError( f"IR decoding error at 0x{addr:02x}. You can hook this " "instruction with a python replacement using project.hook" f"(0x{addr:02x}, your_function, length=length_of_instruction)." ) if irsb.size == 0: raise errors.SimIRSBError("Empty IRSB passed to HeavyVEXMixin.") self.state.scratch.set_tyenv(irsb.tyenv) self.state.scratch.irsb = irsb # fill in artifacts successors.artifacts["irsb"] = irsb successors.artifacts["irsb_size"] = irsb.size successors.artifacts["irsb_direct_next"] = irsb.direct_next successors.artifacts["irsb_default_jumpkind"] = irsb.jumpkind successors.artifacts["insn_addrs"] = [] try: self.handle_vex_block(irsb) except errors.SimReliftException as e: self.state = e.state if insn_bytes is not None: raise errors.SimEngineError("You cannot pass self-modifying code as insn_bytes!!!") from e new_ip = self.state.scratch.ins_addr if size is not None: size -= new_ip - addr if num_inst is not None: num_inst -= self.state.scratch.num_insns addr = new_ip # clear the stage before creating the new IRSB self.state.scratch.dirty_addrs.clear() irsb = None except errors.SimError as ex: ex.record_state(self.state) raise except VEXEarlyExit: break else: break # do return emulation and calless stuff for exit_state in list(successors.all_successors): exit_jumpkind = exit_state.history.jumpkind if exit_state.history.jumpkind else "" if o.CALLLESS in self.state.options and exit_jumpkind == "Ijk_Call": exit_state.registers.store( exit_state.arch.ret_offset, exit_state.solver.Unconstrained("fake_ret_value", exit_state.arch.bits) ) exit_state.scratch.target = claripy.BVV(successors.addr + irsb.size, exit_state.arch.bits) exit_state.history.jumpkind = "Ijk_Ret" exit_state.regs.ip = exit_state.scratch.target if exit_state.arch.call_pushes_ret: exit_state.regs.sp = exit_state.regs.sp + exit_state.arch.bytes elif o.DO_RET_EMULATION in exit_state.options and ( exit_jumpkind == "Ijk_Call" or exit_jumpkind.startswith("Ijk_Sys") ): l.debug("%s adding postcall exit.", self) ret_state = exit_state.copy() guard = claripy.true() if o.TRUE_RET_EMULATION_GUARD in self.state.options else claripy.false() ret_target = claripy.BVV(successors.addr + irsb.size, ret_state.arch.bits) ret_state.registers.store( ret_state.arch.ret_offset, ret_state.solver.Unconstrained("fake_ret_value", ret_state.arch.bits) ) if ret_state.arch.call_pushes_ret and not exit_jumpkind.startswith("Ijk_Sys"): ret_state.regs.sp = ret_state.regs.sp + ret_state.arch.bytes successors.add_successor( ret_state, ret_target, guard, "Ijk_FakeRet", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) successors.processed = True return None
# # behavior instrumenting the VEXMixin # # statements def _handle_vex_stmt(self, stmt): self.state.scratch.stmt_idx = self.stmt_idx super()._handle_vex_stmt(stmt) def _handle_vex_stmt_IMark(self, stmt): ins_addr = stmt.addr + stmt.delta self.state.scratch.ins_addr = ins_addr # Raise an exception if we're suddenly in self-modifying code if (self.project is None or self.project.selfmodifying_code) and self.state.scratch.dirty_addrs: instruction_len = stmt.len if instruction_len == 0: # We don't know how long this instruction is. # Conservatively assume it is the maximum instruction # length for the purpose of dirty checks. instruction_len = self.project.arch.max_inst_bytes for subaddr in range(instruction_len): if subaddr + stmt.addr in self.state.scratch.dirty_addrs: raise errors.SimReliftException(self.state) # HACK: mips64 may put an instruction which may fault in the delay slot of a branch likely instruction # if the branch is not taken, we must not execute that instruction if the condition fails (i.e. the current # guard is False) if self.state.scratch.guard.is_false(): self.successors.add_successor(self.state, ins_addr, self.state.scratch.guard, "Ijk_Boring") raise VEXEarlyExit self.state.scratch.num_insns += 1 self.successors.artifacts["insn_addrs"].append(ins_addr) self.state.history.recent_instruction_count += 1 l.debug("IMark: %#x", stmt.addr) super()._handle_vex_stmt_IMark(stmt) def _perform_vex_stmt_Exit(self, guard, target, jumpkind): cont_state = None exit_state = None guard = guard != 0 if o.COPY_STATES not in self.state.options: # very special logic to try to minimize copies # first, check if this branch is impossible if guard.is_false() or ( o.LAZY_SOLVES not in self.state.options and not self.state.solver.satisfiable(extra_constraints=(guard,)) ): cont_state = self.state # then, check if it's impossible to continue from this branch elif guard.is_true() or ( o.LAZY_SOLVES not in self.state.options and not self.state.solver.satisfiable(extra_constraints=(claripy.Not(guard),)) ): exit_state = self.state # one more step, when LAZY_SOLVES is enabled, ignore "bad" jumpkinds elif o.LAZY_SOLVES in self.state.options and jumpkind.startswith("Ijk_Sig"): cont_state = self.state else: if o.LAZY_SOLVES not in self.state.options or not jumpkind.startswith("Ijk_Sig"): # when LAZY_SOLVES is enabled, we ignore "bad" jumpkinds exit_state = self.state.copy() cont_state = self.state else: exit_state = self.state.copy() cont_state = self.state if exit_state is not None: self.successors.add_successor( exit_state, target, guard, jumpkind, exit_stmt_idx=self.stmt_idx, exit_ins_addr=self.state.scratch.ins_addr, ) if cont_state is None: raise VEXEarlyExit # Do our bookkeeping on the continuing self.state cont_condition = ~guard cont_state.add_constraints(cont_condition) cont_state.scratch.guard = claripy.And(cont_state.scratch.guard, cont_condition) def _perform_vex_stmt_Dirty_call(self, func_name, ty, args, func=None): if func is None: try: func = getattr(dirty, func_name) except AttributeError as e: raise errors.UnsupportedDirtyError(f"Unsupported dirty helper {func_name}") from e retval, retval_constraints = func(self.state, *args) self.state.add_constraints(*retval_constraints) return retval # expressions def _instrument_vex_expr(self, result): if o.SIMPLIFY_EXPRS in self.state.options: result = self.state.solver.simplify(result) if self.state.solver.symbolic(result) and o.CONCRETIZE in self.state.options: concrete_value = claripy.BVV(self.state.solver.eval(result), len(result)) self.state.add_constraints(result == concrete_value) result = concrete_value return super()._instrument_vex_expr(result) def _perform_vex_expr_Load(self, addr, ty, endness, **kwargs): result = super()._perform_vex_expr_Load(addr, ty, endness, **kwargs) if o.UNINITIALIZED_ACCESS_AWARENESS in self.state.options and addr.initialized: raise errors.SimUninitializedAccessError("addr", addr) return result def _perform_vex_expr_CCall(self, func_name, ty, args, func=None): if o.DO_CCALLS not in self.state.options: return symbol(ty, "ccall_ret") return super()._perform_vex_expr_CCall(func_name, ty, args, func=None) def _analyze_vex_defaultexit(self, expr): self.state.scratch.stmt_idx = DEFAULT_STATEMENT return super()._analyze_vex_defaultexit(expr) def _perform_vex_defaultexit(self, expr, jumpkind): if expr is None: expr = self.state.regs.ip self.successors.add_successor( self.state, expr, self.state.scratch.guard, jumpkind, add_guard=False, # if there is any guard, it has been added by the Exit statement # that we come across prior to the default exit. adding guard # again is unnecessary and will cause trouble in abstract solver # mode, exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, )