from __future__ import annotations
import copy
import functools
import logging
import archinfo
import claripy
import angr
from angr.errors import SimIRSBError, SimIRSBNoDecodeError, SimValueError
from .engine import SuccessorsMixin
from .vex.heavy.heavy import VEXEarlyExit
from angr import sim_options as o
from angr.misc.ux import once
from angr.state_plugins.inspect import BP_AFTER, BP_BEFORE
from angr.state_plugins.unicorn_engine import STOP, _UC_NATIVE, unicorn as uc_module
from angr.utils.constants import DEFAULT_STATEMENT
# pylint: disable=arguments-differ
l = logging.getLogger(name=__name__)
[文档]
class SimEngineUnicorn(SuccessorsMixin):
"""
Concrete execution in the Unicorn Engine, a fork of qemu.
Responds to the following parameters in the step stack:
- step: How many basic blocks we want to execute
- extra_stop_points: A collection of addresses at which execution should halt
"""
[文档]
def __init__(self, project: angr.Project):
super().__init__(project)
# Cache of details of basic blocks containing statements that need to re-executed
self._block_details_cache = {}
# Addresses of basic blocks which native interface will not execute
self._stop_block_addrs_cache = set()
# Stop reasons to track and not switch to native interface for those basic blocks
self._stop_reasons_to_track = STOP.unsupported_reasons | {
STOP.STOP_STOPPOINT,
STOP.STOP_ERROR,
STOP.STOP_NODECODE,
STOP.STOP_SYSCALL,
STOP.STOP_EXECNONE,
STOP.STOP_ZEROPAGE,
STOP.STOP_NOSTART,
STOP.STOP_SEGFAULT,
STOP.STOP_ZERO_DIV,
STOP.STOP_HLT,
STOP.STOP_SYSCALL_ARM,
STOP.STOP_X86_CPUID,
}
def __getstate__(self):
parent_ret = super().__getstate__()
return (parent_ret, self._block_details_cache, self._stop_block_addrs_cache, self._stop_reasons_to_track)
def __setstate__(self, args):
super().__setstate__(args[0])
self._block_details_cache = args[1]
self._stop_block_addrs_cache = args[2]
self._stop_reasons_to_track = args[3]
def __check(self, num_inst=None, **kwargs): # pylint: disable=unused-argument
state = self.state
if o.UNICORN not in state.options:
l.debug("Unicorn-engine is not enabled.")
return False
if uc_module is None or _UC_NATIVE is None:
if once("unicorn_install_warning"):
l.error(
"You are attempting to use unicorn engine support even though it or the angr native layer "
"isn't installed"
)
return False
self.__countdown(state)
# should the countdown still be updated if we're not stepping a whole block?
# current decision: leave it updated, since we are moving forward
if num_inst is not None:
if once("unicorn_num_inst_warning"):
l.warning("unicorn engine doesn't support stepping with num_inst")
return False
unicorn = state.unicorn # shorthand
# if we have a concrete target we want the program to synchronize the segment
# registers before, otherwise undefined behavior could happen.
if (
state.project.concrete_target
and self.project.arch.name in ("x86", "x86_64")
and not state.concrete.segment_registers_initialized
):
l.debug("segment register must be synchronized with the concrete target before using unicorn engine")
return False
if state.regs.ip.symbolic:
l.debug("symbolic IP!")
return False
if unicorn.countdown_symbolic_stop > 0:
l.info("not enough blocks since symbolic stop (%d more)", unicorn.countdown_symbolic_stop)
return False
if unicorn.countdown_unsupported_stop > 0:
l.info(
"not enough blocks since unsupported VEX statement/expression stop (%d more)",
unicorn.countdown_unsupported_stop,
)
return False
if unicorn.countdown_nonunicorn_blocks > 0:
l.info("not enough runs since last unicorn (%d)", unicorn.countdown_nonunicorn_blocks)
return False
if unicorn.countdown_stop_point > 0:
l.info("not enough blocks since stop point (%d more)", unicorn.countdown_stop_point)
elif o.UNICORN_SYM_REGS_SUPPORT not in state.options and not unicorn._check_registers():
l.info("failed register check")
return False
if state.addr in self._stop_block_addrs_cache:
l.info("Block will likely not execute in native interface")
return False
return True
@staticmethod
def __countdown(state):
state.unicorn.countdown_nonunicorn_blocks -= 1
state.unicorn.countdown_symbolic_stop -= 1
state.unicorn.countdown_unsupported_stop -= 1
state.unicorn.countdown_stop_point -= 1
def _execute_block_instrs_in_vex(self, block_details):
if block_details["block_addr"] not in self._block_details_cache:
vex_block = self._get_vex_block_details(block_details["block_addr"], block_details["block_size"])
self._block_details_cache[block_details["block_addr"]] = vex_block
else:
vex_block = self._block_details_cache[block_details["block_addr"]]
# Save breakpoints for restoring later
saved_mem_read_breakpoints = copy.copy(self.state.inspect._breakpoints["mem_read"])
saved_mem_write_breakpoints = copy.copy(self.state.inspect._breakpoints["mem_write"])
for reg_name, reg_value in block_details["registers"]:
self.state.registers.store(reg_name, reg_value, inspect=False, disable_actions=True)
self.state.scratch.set_tyenv(vex_block.tyenv)
for stmt_entry in block_details["stmts"]:
self._instr_mem_reads = list(stmt_entry["mem_dep"]) # pylint:disable=attribute-defined-outside-init
if self._instr_mem_reads:
# Insert breakpoint to set the correct memory read address
self.state.inspect.b("mem_read", when=BP_BEFORE, action=self._set_correct_mem_read_addr)
self.state.inspect.b("mem_write", when=BP_AFTER, action=self._save_mem_write_addrs)
execute_default_exit = True
# Execute handler from HeavyVEXMixin for the statement
vex_stmt = vex_block.statements[stmt_entry["stmt_idx"]]
self.stmt_idx = stmt_entry["stmt_idx"] # pylint:disable=attribute-defined-outside-init
try:
super()._handle_vex_stmt(vex_stmt) # pylint:disable=no-member
except VEXEarlyExit:
# Only one path is satisfiable in this branch.
execute_default_exit = False
# Restore breakpoints
self.state.inspect._breakpoints["mem_read"] = copy.copy(saved_mem_read_breakpoints)
self.state.inspect._breakpoints["mem_write"] = copy.copy(saved_mem_write_breakpoints)
del self._instr_mem_reads
if execute_default_exit and block_details["has_symbolic_exit"]:
# Process block's default exit
self.stmt_idx = DEFAULT_STATEMENT # pylint:disable=attribute-defined-outside-init
super()._handle_vex_defaultexit(vex_block.next, vex_block.jumpkind) # pylint:disable=no-member
# Restore breakpoints
for succ_state in self.successors.successors:
succ_state.inspect._breakpoints["mem_read"] = copy.copy(saved_mem_read_breakpoints)
succ_state.inspect._breakpoints["mem_write"] = copy.copy(saved_mem_read_breakpoints)
del self.stmt_idx
def _execute_symbolic_instrs(self, syscall_data):
recent_bbl_addrs = None
stop_details = None
self._instr_mem_write_addrs = set() # pylint:disable=attribute-defined-outside-init
for block_details in self.state.unicorn._get_details_of_blocks_with_symbolic_vex_stmts():
self.state.scratch.guard = claripy.true()
try:
if self.state.os_name == "CGC" and block_details["block_addr"] in {
self.state.unicorn.cgc_random_addr,
self.state.unicorn.cgc_receive_addr,
}:
# Re-execute CGC syscall
reg_vals = dict(block_details["registers"])
curr_regs = self.state.regs
# If any regs are not present in the block details for re-execute, they are probably symbolic and so
# were not saved in native interface. Use current register values in those cases: they should have
# correct values right now.
if block_details["block_addr"] == self.state.unicorn.cgc_receive_addr:
# rx_bytes argument is set to 0 since we care about updating symbolic values only
syscall_args = [
reg_vals.get("ebx", curr_regs.ebx),
reg_vals.get("ecx", curr_regs.ecx),
reg_vals.get("edx", curr_regs.edx),
0,
]
syscall_simproc = self.state.project.simos.syscall_from_number(3, abi=None)
syscall_simproc.arch = self.state.arch
syscall_simproc.project = self.state.project
syscall_simproc.state = self.state
syscall_simproc.cc = self.state.project.simos.syscall_cc(self.state)
ret_val = getattr(syscall_simproc, syscall_simproc.run_func)(*syscall_args)
self.state.registers.store("eax", ret_val, inspect=False, disable_actions=True)
elif block_details["block_addr"] == self.state.unicorn.cgc_random_addr:
syscall_simproc = self.state.project.simos.syscall_from_number(7, abi=None)
# rnd_bytes argument is set to 0 since we care about updating symbolic values only
syscall_args = [reg_vals.get("ebx", curr_regs.ebx), reg_vals.get("ecx", curr_regs.ecx), 0]
if o.UNICORN_HANDLE_CGC_RANDOM_SYSCALL in self.state.options:
# Update concrete value before invoking syscall
concrete_data = b""
curr_size = 0
max_size = self.state.solver.eval(syscall_args[1])
while curr_size != max_size:
next_entry = syscall_data["random"].pop(0)
curr_size = curr_size + next_entry[1]
endianness = "little" if self.state.arch.memory_endness == "Iend_LE" else "big"
concrete_data = concrete_data + next_entry[0].to_bytes(next_entry[1], endianness)
else:
concrete_data = None
syscall_simproc.arch = self.state.arch
syscall_simproc.project = self.state.project
syscall_simproc.state = self.state
syscall_simproc.cc = self.state.project.simos.syscall_cc(self.state)
ret_val = getattr(syscall_simproc, syscall_simproc.run_func)(*syscall_args, concrete_data)
self.state.registers.store("eax", ret_val, inspect=False, disable_actions=True)
else:
if block_details["has_symbolic_exit"]:
curr_succs_count = len(self.successors.successors)
if not recent_bbl_addrs:
recent_bbl_addrs = self.state.unicorn.get_recent_bbl_addrs()
if not stop_details:
stop_details = self.state.unicorn.get_stop_details()
self._execute_block_instrs_in_vex(block_details)
if block_details["has_symbolic_exit"]:
curr_succs = self.successors.successors
if len(curr_succs) == curr_succs_count + 1:
# There is only one newly added satisfiable successor state and so that is the state that
# follows path being traced
self.state = curr_succs[curr_succs_count]
self.successors.flat_successors.remove(self.state)
self.successors.all_successors.remove(self.state)
self.successors.successors.remove(self.state)
else:
# There are multiple satisfiable states. Use the state's record of basic blocks executed
# and block where native interface stopped to determine which state followed the path traced
# till now
next_block_on_path = None
if block_details["block_hist_ind"] + 1 < len(recent_bbl_addrs):
next_block_on_path = recent_bbl_addrs[block_details["block_hist_ind"] + 1]
else:
next_block_on_path = stop_details.block_addr
for succ in curr_succs[curr_succs_count:]:
if succ.addr == next_block_on_path:
self.state = succ
self.successors.flat_successors.remove(succ)
self.successors.successors.remove(succ)
break
else:
raise Exception("Multiple valid successor states found but none followed the trace!")
except SimValueError as e:
l.error(e)
del self._instr_mem_write_addrs
def _get_vex_block_details(self, block_addr, block_size):
# Mostly based on the lifting code in HeavyVEXMixin
# pylint:disable=no-member
irsb = super().lift_vex(addr=block_addr, state=self.state, size=block_size)
if irsb.size == 0:
if irsb.jumpkind == "Ijk_NoDecode":
if not self.state.project.is_hooked(irsb.addr):
raise SimIRSBNoDecodeError(
f"IR decoding error at 0x{irsb.addr:02x}. You can hook this instruction"
" with a python replacement using project.hook"
f"(0x{irsb.addr:02x}, your_function, length=length_of_instruction)."
)
raise SimIRSBError("Block is hooked with custom code but original block was executed in unicorn")
raise SimIRSBError(f"Empty IRSB found at 0x{irsb.addr:02x}.")
return irsb
def _set_correct_mem_read_addr(self, state):
assert len(self._instr_mem_reads) != 0
mem_read_val = b""
mem_read_size = 0
mem_read_address = None
mem_read_taint_map = []
while mem_read_size != state.inspect.mem_read_length and self._instr_mem_reads:
next_val = self._instr_mem_reads.pop(0)
if not mem_read_address:
mem_read_address = next_val["address"]
if next_val["symbolic"]:
if next_val["address"] in self._instr_mem_write_addrs:
# This address was modified during re-execution. Ignore taint reported by native interface
mem_read_taint_map.append(-1)
else:
mem_read_taint_map.append(1)
else:
mem_read_taint_map.append(0)
mem_read_size += 1
mem_read_val += next_val["value"]
assert state.inspect.mem_read_length == mem_read_size
state.inspect.mem_read_address = claripy.BVV(mem_read_address, state.inspect.mem_read_address.size())
if mem_read_taint_map.count(-1) != mem_read_size:
# Since read is might need bitmap adjustment, insert breakpoint to return the correct concrete value
self.state.inspect.b(
"mem_read",
when=BP_AFTER,
action=functools.partial(
self._set_correct_mem_read_val, value=mem_read_val, taint_map=mem_read_taint_map
),
)
def _set_correct_mem_read_val(self, state, value, taint_map): # pylint: disable=no-self-use
state.inspect._breakpoints["mem_read"].pop()
if taint_map.count(0) == state.inspect.mem_read_length:
# The value is completely concrete
if state.arch.memory_endness == archinfo.Endness.LE:
state.inspect.mem_read_expr = claripy.BVV(value[::-1])
else:
state.inspect.mem_read_expr = claripy.BVV(value)
else:
# The value may be partially concrete. Set the symbolic bitmap to read correct value and restore it
mem_read_addr = state.solver.eval(state.inspect.mem_read_address)
mem_read_len = state.inspect.mem_read_length
saved_taints = []
for offset in range(mem_read_len):
page_num, page_off = state.memory._divide_addr(mem_read_addr + offset)
page_obj = state.memory._get_page(page_num, writing=False)
saved_taints.append(page_obj.symbolic_bitmap[page_off])
restore_taints = False
if saved_taints != taint_map:
# Symbolic bitmap needs fixing before reading value from memory.
restore_taints = True
for offset, expected_taint in enumerate(taint_map):
if expected_taint != -1:
page_num, page_off = state.memory._divide_addr(mem_read_addr + offset)
page_obj = state.memory._get_page(page_num, writing=False)
page_obj.symbolic_bitmap[page_off] = expected_taint
curr_value = state.memory.load(
mem_read_addr, mem_read_len, endness=state.arch.memory_endness, inspect=False, disable_actions=True
)
if restore_taints:
for offset, saved_taint in enumerate(saved_taints):
page_num, page_off = state.memory._divide_addr(mem_read_addr + offset)
page_obj = state.memory._get_page(page_num, writing=False)
page_obj.symbolic_bitmap[page_off] = saved_taint
if taint_map.count(0) != 0:
# Update concrete bytes using values reported by native interface
curr_value_bytes = curr_value.chop(8)
if state.arch.memory_endness == archinfo.Endness.LE:
curr_value_bytes.reverse()
for offset, expected_taint in enumerate(taint_map):
if expected_taint == 0:
curr_value_bytes[offset] = claripy.BVV(value[offset], 8)
if state.arch.memory_endness == archinfo.Endness.LE:
curr_value_bytes = reversed(curr_value_bytes)
curr_value = claripy.Concat(*curr_value_bytes)
state.inspect.mem_read_expr = curr_value
def _save_mem_write_addrs(self, state):
mem_write_addr = state.solver.eval(state.inspect.mem_write_address)
self._instr_mem_write_addrs.update(range(mem_write_addr, mem_write_addr + state.inspect.mem_write_length))
[文档]
def process_successors(self, successors, **kwargs):
state = self.state
if not self.__check(**kwargs):
return super().process_successors(successors, **kwargs)
extra_stop_points = kwargs.get("extra_stop_points")
last_block_details = kwargs.get("last_block_details")
step = kwargs.get("step")
if extra_stop_points is None:
extra_stop_points = set(self.project._sim_procedures)
else:
# convert extra_stop_points to a set
extra_stop_points = set(extra_stop_points)
extra_stop_points.update(self.project._sim_procedures)
if successors.addr in extra_stop_points:
# trying to start unicorn execution on a stop point, fallback to next engine
return super().process_successors(successors, **kwargs)
successors.sort = "Unicorn"
# add all instruction breakpoints as extra_stop_points
if state.supports_inspect:
for bp in state.inspect._breakpoints["instruction"]:
# if there is an instruction breakpoint on every instruction, it does not make sense
# to use unicorn.
if "instruction" not in bp.kwargs:
l.info("disabling unicorn because of breakpoint on every instruction")
return super().process_successors(successors, **kwargs)
# add the breakpoint to extra_stop_points. We don't care if the breakpoint is BP_BEFORE or
# BP_AFTER, this is only to stop unicorn when we get near a breakpoint. The breakpoint itself
# will then be handled by another engine that can more accurately step instruction-by-instruction.
extra_stop_points.add(bp.kwargs["instruction"])
# initialize unicorn plugin
try:
syscall_data = kwargs.get("syscall_data")
fd_bytes = kwargs.get("fd_bytes")
state.unicorn.setup(syscall_data=syscall_data, fd_bytes=fd_bytes)
except SimValueError:
# it's trying to set a symbolic register somehow
# fail out, force fallback to next engine
return super().process_successors(successors, **kwargs)
try:
state.unicorn.set_stops(extra_stop_points)
if last_block_details is not None:
state.unicorn.set_last_block_details(last_block_details)
state.unicorn.set_tracking(
track_bbls=o.UNICORN_TRACK_BBL_ADDRS in state.options,
track_stack=o.UNICORN_TRACK_STACK_POINTERS in state.options,
)
state.unicorn.hook()
state.unicorn.start(step=step)
self._execute_symbolic_instrs(syscall_data=syscall_data)
state.unicorn.finish(self.state)
finally:
state.unicorn.destroy(self.state)
state = self.state
if state.unicorn.stop_reason in self._stop_reasons_to_track:
if state.unicorn.steps == 0:
self._stop_block_addrs_cache.add(state.addr)
else:
self._stop_block_addrs_cache.add(state.unicorn.stop_details.block_addr)
if state.unicorn.steps == 0 or state.unicorn.stop_reason == STOP.STOP_NOSTART:
# fail out, force fallback to next engine
# TODO: idk what the consequences of this might be. If this failed step can actually change non-unicorn
# state then this is bad news.
return super().process_successors(successors, **kwargs)
description = f"Unicorn ({STOP.name_stop(state.unicorn.stop_reason)} after {state.unicorn.steps} steps)"
state.history.recent_block_count += state.unicorn.steps
state.history.recent_description = description
# this can be expensive, so check first
if state.supports_inspect:
for bp in state.inspect._breakpoints["irsb"]:
if bp.check(state, BP_AFTER):
for bbl_addr in state.history.recent_bbl_addrs:
state._inspect("irsb", BP_AFTER, address=bbl_addr)
break
if state.unicorn.stop_reason in (
STOP.symbolic_stop_reasons | STOP.unsupported_reasons
) or state.unicorn.stop_reason in (STOP.STOP_UNKNOWN_MEMORY_WRITE_SIZE, STOP.STOP_VEX_LIFT_FAILED):
l.info(state.unicorn.stop_message)
if state.unicorn.jumpkind.startswith("Ijk_Sys"):
state.ip = state.unicorn._syscall_pc
successors.add_successor(state, state.ip, claripy.true(), state.unicorn.jumpkind)
successors.description = description
successors.processed = True
return None