angr.analyses.propagator.engine_vex 源代码

# pylint: disable=missing-class-docstring
from __future__ import annotations
from typing import cast
import logging

import claripy
import pyvex

from angr.engines.vex.claripy.datalayer import value
from angr.knowledge_plugins.propagations.states import RegisterAnnotation, RegisterComparisonAnnotation
from angr.engines.light import SimEngineNostmtVEX
from angr.calling_conventions import DEFAULT_CC, SYSCALL_CC, default_cc, SimRegArg
from angr.analyses.propagator.propagator import PropagatorVEXState
from angr.block import Block
from .engine_base import SimEnginePropagatorBaseMixin
from .top_checker_mixin import ClaripyDataVEXEngineMixin
from .vex_vars import VEXReg, VEXTmp, VEXMemVar


_l = logging.getLogger(name=__name__)

dirty_handler = SimEngineNostmtVEX[PropagatorVEXState, claripy.ast.BV, PropagatorVEXState].dirty_handler
binop_handler = SimEngineNostmtVEX[PropagatorVEXState, claripy.ast.BV, PropagatorVEXState].binop_handler


[文档] class SimEnginePropagatorVEX( ClaripyDataVEXEngineMixin[PropagatorVEXState, claripy.ast.BV, PropagatorVEXState, None], SimEnginePropagatorBaseMixin[PropagatorVEXState, claripy.ast.BV, Block], SimEngineNostmtVEX[PropagatorVEXState, claripy.ast.BV, PropagatorVEXState], ): @dirty_handler def _handle_dirty_noop(self, expr): if expr.tmp not in (-1, 0xFFFFFFFF): self.tmps[expr.tmp] = self._top(pyvex.get_type_size(self.tyenv.lookup(expr.tmp))) _handle_dirty_riscv_dirtyhelper_CSR_rw = _handle_dirty_noop _handle_dirty_riscv_dirtyhelper_CSR_s = _handle_dirty_noop _handle_dirty_riscv_dirtyhelper_CSR_c = _handle_dirty_noop _handle_dirty_riscv_dirtyhelper_mret = _handle_dirty_noop # # Private methods # def _process_block_end(self, stmt_result, whitelist): return self.state def _process_block(self, whitelist=None): result = super()._process_block(whitelist) if self.block.vex.jumpkind == "Ijk_Call" and self.arch.call_pushes_ret: # pop ret from the stack sp_offset = self.arch.sp_offset sp_value = self.state.load_register(sp_offset, self.arch.bytes) if sp_value is not None: self.state.store_register(sp_offset, self.arch.bytes, sp_value + self.arch.bytes) if self.block.vex.jumpkind == "Ijk_Call" or self.block.vex.jumpkind.startswith("Ijk_Sys"): self._handle_return_from_call() return result def _allow_loading(self, addr: claripy.ast.BV, size): if self._is_top(addr): return False if self._load_callback is None: return True return self._load_callback(addr, size) def _load_data(self, addr: claripy.ast.BV, size, endness) -> claripy.ast.BV: sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variable return self.state.load_local_variable(sp_offset, size, endness) if addr.op == "BVV" and self._allow_loading(addr, size): # Try loading from the state addr_int = cast(int, addr.args[0]) if self.base_state is not None: _l.debug("Loading %d bytes from %x.", size, addr_int) data = self.base_state.memory.load(addr, size, endness=endness) if not data.symbolic: return data else: try: val = self.project.loader.memory.unpack_word(addr_int, size=size, endness=endness) return claripy.BVV(val, size * self.arch.byte_width) except KeyError: pass return self._top(size * self.arch.byte_width) # # Function handlers # def _handle_function(self, addr): if self.arch.name == "X86" and isinstance(addr, claripy.ast.Base) and addr.op == "BVV": try: b = self.project.loader.memory.load(addr.args[0], 4) except KeyError: return except TypeError: return if b == b"\x8b\x1c\x24\xc3": # getpc: # mov ebx, [esp] # ret ebx_offset = self.arch.registers["ebx"][0] self.state.store_register(ebx_offset, 4, claripy.BVV(self.block.addr + self.block.size, 32)) def _handle_return_from_call(self): # FIXME: Handle the specific function calling convention when known syscall = self.block.vex.jumpkind.startswith("Ijk_Sys") cc_map = SYSCALL_CC if syscall else DEFAULT_CC if self.arch.name in cc_map: cc = default_cc( self.arch.name, platform=self.project.simos.name if self.project.simos is not None else None, syscall=syscall, ) # don't instantiate the class for speed assert cc is not None if isinstance(cc.RETURN_VAL, SimRegArg): offset, size = self.arch.registers[cc.RETURN_VAL.reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) if cc.CALLER_SAVED_REGS: for reg_name in cc.CALLER_SAVED_REGS: offset, size = self.arch.registers[reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) # # VEX statement handlers # def _handle_stmt_WrTmp(self, stmt): self.tmps[stmt.tmp] = self._expr(stmt.data) if stmt.tmp in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.tmp), self.tmps[stmt.tmp]) def _handle_stmt_Put(self, stmt): size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not self._is_top(data) or self.state._store_tops: self.state.store_register(stmt.offset, size, data) self.state.add_replacement(self._codeloc(block_only=False), VEXReg(stmt.offset, size), data) def _handle_stmt_PutI(self, stmt): self._expr(stmt.data) def _store_data(self, addr: claripy.ast.BV, data: claripy.ast.Bits, size: int, endness: str): sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variables self.state.store_local_variable(sp_offset, size, data, endness) elif addr.op == "BVV": # a memory address addr_int = cast(int, addr.args[0]) variable = VEXMemVar(addr_int, size) self.state.add_replacement(self._codeloc(block_only=False), variable, data) def _handle_stmt_Store(self, stmt): addr = self._expr_bv(stmt.addr) if self.state.is_top(addr): return size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not self.state.is_top(data) or self.state._store_tops: self._store_data(addr, data, size, self.arch.memory_endness) def _handle_stmt_LoadG(self, stmt): guard = self._expr(stmt.guard) if guard is True: addr = self._expr_bv(stmt.addr) if addr is not None: self.tmps[stmt.dst] = self._load_data( addr, stmt.alt.result_size(self.tyenv) // 8, self.arch.memory_endness ) elif guard is False: data = self._expr(stmt.alt) self.tmps[stmt.dst] = data else: self.tmps[stmt.dst] = self._top(stmt.alt.result_size(self.tyenv)) # add replacement if not self._is_top(self.tmps[stmt.dst]): self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.dst), self.tmps[stmt.dst]) def _handle_stmt_StoreG(self, stmt): guard = self._expr_bv(stmt.guard) data = self._expr(stmt.data) if (guard != 0).is_true(): addr = self._expr_bv(stmt.addr) self._store_data(addr, data, stmt.data.result_size(self.tyenv) // 8, self.arch.memory_endness) def _handle_stmt_LLSC(self, stmt): if stmt.storedata is None: # load-link addr = self._expr_bv(stmt.addr) size = self.tyenv.sizeof(stmt.result) // self.arch.byte_width data = self._load_data(addr, size, stmt.endness) self.tmps[stmt.result] = data if stmt.result in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) else: # store-conditional storedata = self._expr(stmt.storedata) addr = self._expr_bv(stmt.addr) size = storedata.size() // self.arch.byte_width self._store_data(addr, storedata, size, stmt.endness) self.tmps[stmt.result] = claripy.BVV(1, 1) self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) @binop_handler def _handle_binop_CmpEQ(self, expr): lhs, rhs = self._expr(expr.args[0]), self._expr(expr.args[1]) if rhs.concrete and len(lhs.annotations) == 1: anno = lhs.annotations[0] if isinstance(anno, RegisterAnnotation): cmp_anno = RegisterComparisonAnnotation(anno.offset, anno.size, "eq", rhs.concrete_value) bits = pyvex.get_type_size(pyvex.get_op_retty(expr.op)) return self.state.top(bits).annotate(cmp_anno) if lhs.concrete and len(rhs.annotations) == 1: anno = rhs.annotations[0] if isinstance(anno, RegisterAnnotation): cmp_anno = RegisterComparisonAnnotation(anno.offset, anno.size, "eq", lhs.concrete_value) bits = pyvex.get_type_size(pyvex.get_op_retty(expr.op)) return self.state.top(bits).annotate(cmp_anno) return super()._handle_binop_CmpEQ(expr) # # Expression handlers # def _handle_expr_Get(self, expr): size = expr.result_size(self.tyenv) // self.arch.byte_width result = self.state.load_register(expr.offset, size) if not self._is_top(result) and expr.offset not in ( self.arch.sp_offset, self.arch.ip_offset, ): # Record the replacement self.state.add_replacement( self._codeloc(block_only=False), VEXReg(expr.offset, expr.result_size(self.tyenv) // 8), result ) return result def _handle_expr_GetI(self, expr): return self.state.top(expr.result_size(self.tyenv)) def _handle_expr_ITE(self, expr): return self.state.top(expr.result_size(self.tyenv)) def _handle_expr_GSPTR(self, expr): return self.state.top(expr.result_size(self.tyenv)) def _handle_expr_VECRET(self, expr): return self.state.top(expr.result_size(self.tyenv)) def _handle_expr_RdTmp(self, expr): try: return self.tmps[expr.tmp] except KeyError: return self._top(pyvex.get_type_size(self.tyenv.lookup(expr.tmp))) def _handle_expr_Const(self, expr): result = value(expr.con.type, expr.con.value) if isinstance(result, claripy.ast.FP): return self._top(expr.con.size) return result def _handle_expr_Load(self, expr): addr = self._expr_bv(expr.addr) size = expr.result_size(self.tyenv) // self.arch.byte_width return self._load_data(addr, size, expr.endness) def _handle_expr_Binop(self, expr): if not self.state.do_binops: return self._top(expr.result_size(self.tyenv)) return super()._handle_expr_Binop(expr) def _handle_expr_Triop(self, expr): if not self.state.do_binops: return self._top(expr.result_size(self.tyenv)) return super()._handle_expr_Triop(expr) def _handle_stmt_Exit(self, stmt: pyvex.stmt.Exit): guard = self._expr(stmt.guard) if len(guard.annotations) == 1: dst = value(stmt.dst.type, stmt.dst.value) if dst.concrete: anno = guard.annotations[0] if isinstance(anno, RegisterComparisonAnnotation) and anno.cmp_op == "eq": v = (anno.offset, anno.size, anno.value) if v not in self.state.block_initial_reg_values[self.block.addr, dst.concrete_value]: self.state.block_initial_reg_values[self.block.addr, dst.concrete_value].append(v)