angr.analyses.reaching_definitions.reaching_definitions 源代码

from __future__ import annotations
import logging
from typing import Any
from collections.abc import Iterable
from collections import defaultdict

import ailment
import pyvex

from angr.analyses import ForwardAnalysis
from angr.block import Block
from angr.knowledge_plugins.cfg.cfg_node import CFGNode
from angr.codenode import CodeNode
from angr.engines.light import SimEngineLight
from angr.knowledge_plugins.functions import Function
from angr.knowledge_plugins.key_definitions import ReachingDefinitionsModel, LiveDefinitions
from angr.knowledge_plugins.key_definitions.constants import OP_BEFORE, OP_AFTER, ObservationPointType, ObservationPoint
from angr.code_location import CodeLocation, ExternalCodeLocation
from angr.misc.ux import deprecated
from angr.analyses.forward_analysis.visitors.graph import NodeType
from angr.analyses.analysis import Analysis
from .engine_ail import SimEngineRDAIL
from .engine_vex import SimEngineRDVEX
from .rd_state import ReachingDefinitionsState
from .rd_initializer import RDAStateInitializer
from .subject import Subject, SubjectType
from .function_handler import FunctionHandler, FunctionCallRelationships
from .dep_graph import DepGraph
import contextlib


l = logging.getLogger(name=__name__)


[文档] class ReachingDefinitionsAnalysis( ForwardAnalysis[ReachingDefinitionsState, NodeType, object, object], Analysis ): # pylint:disable=abstract-method """ ReachingDefinitionsAnalysis is a text-book implementation of a static data-flow analysis that works on either a function or a block. It supports both VEX and AIL. By registering observers to observation points, users may use this analysis to generate use-def chains, def-use chains, and reaching definitions, and perform other traditional data-flow analyses such as liveness analysis. * I've always wanted to find a better name for this analysis. Now I gave up and decided to live with this name for the foreseeable future (until a better name is proposed by someone else). * Aliasing is definitely a problem, and I forgot how aliasing is resolved in this implementation. I'll leave this as a post-graduation TODO. * Some more documentation and examples would be nice. """
[文档] def __init__( self, subject: Subject | ailment.Block | Block | Function | str = None, func_graph=None, max_iterations=30, track_tmps=False, track_consts=True, observation_points: Iterable[ObservationPoint] | None = None, init_state: ReachingDefinitionsState = None, init_context=None, state_initializer: RDAStateInitializer | None = None, cc=None, function_handler: FunctionHandler | None = None, observe_all=False, visited_blocks=None, dep_graph: DepGraph | bool | None = True, observe_callback=None, canonical_size=8, stack_pointer_tracker=None, use_callee_saved_regs_at_return=True, interfunction_level: int = 0, track_liveness: bool = True, func_addr: int | None = None, element_limit: int = 5, merge_into_tops: bool = True, ): """ :param subject: The subject of the analysis: a function, or a single basic block :param func_graph: Alternative graph for function.graph. :param max_iterations: The maximum number of iterations before the analysis is terminated. :param track_tmps: Whether or not temporary variables should be taken into consideration during the analysis. :param iterable observation_points: A collection of tuples of ("node"|"insn", ins_addr, OP_TYPE) defining where reaching definitions should be copied and stored. OP_TYPE can be OP_BEFORE or OP_AFTER. :param init_state: An optional initialization state. The analysis creates and works on a copy. Default to None: the analysis then initialize its own abstract state, based on the given <Subject>. :param init_context: If init_state is not given, this is used to initialize the context field of the initial state's CodeLocation. The only default-supported type which may go here is a tuple of integers, i.e. a callstack. Anything else requires a custom FunctionHandler. :param cc: Calling convention of the function. :param function_handler: The function handler to update the analysis state and results on function calls. :param observe_all: Observe every statement, both before and after. :param visited_blocks: A set of previously visited blocks. :param dep_graph: An initial dependency graph to add the result of the analysis to. Set it to None to skip dependency graph generation. :param canonical_size: The sizes (in bytes) that objects with an UNKNOWN_SIZE are treated as for operations where sizes are necessary. :param dep_graph: Set this to True to generate a dependency graph for the subject. It will be available as `result.dep_graph`. :param interfunction_level: The number of functions we should recurse into. This parameter is only used if function_handler is not provided. :param track_liveness: Whether to track liveness information. This can consume sizeable amounts of RAM on large functions. (e.g. ~15GB for a function with 4k nodes) :param merge_into_tops: Merge known values into TOP if TOP is present. If True: {TOP} V {0xabc} = {TOP} If False: {TOP} V {0xabc} = {TOP, 0xabc} """ if isinstance(subject, str): subject = self.kb.functions[subject] if not isinstance(subject, Subject): self._subject = Subject(subject, func_graph, cc) else: self._subject = subject self._graph_visitor = self._subject.visitor ForwardAnalysis.__init__( self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=self._graph_visitor ) self._track_tmps = track_tmps self._track_consts = track_consts self._max_iterations = max_iterations self._observation_points = observation_points self._init_state = init_state self._canonical_size = canonical_size self._use_callee_saved_regs_at_return = use_callee_saved_regs_at_return self._func_addr = func_addr self._element_limit = element_limit self._merge_into_tops = merge_into_tops if dep_graph is None or dep_graph is False: self._dep_graph = None elif dep_graph is True: self._dep_graph = DepGraph() else: self._dep_graph = dep_graph if function_handler is None: self._function_handler = FunctionHandler(interfunction_level).hook(self) else: if interfunction_level != 0: l.warning("RDA(interfunction_level=XXX) doesn't do anything if you provide a function handler") self._function_handler = function_handler.hook(self) if self._init_state is not None: self._init_state = self._init_state.copy() self._init_state.analysis = self # There should never be an initializer needed then self._state_initializer = None else: self._state_initializer = state_initializer self._init_context = init_context self._observe_all = observe_all self._observe_callback = observe_callback # sanity check if self._observation_points and any(type(op) is not tuple for op in self._observation_points): raise ValueError('"observation_points" must be tuples.') self._node_iterations: defaultdict[int, int] = defaultdict(int) self.model: ReachingDefinitionsModel = ReachingDefinitionsModel( func_addr=self.subject.content.addr if isinstance(self.subject.content, Function) else None, track_liveness=track_liveness, ) bp_as_gpr = False the_func = None if isinstance(self.subject.content, Function): the_func = self.subject.content else: if self._func_addr is not None: with contextlib.suppress(KeyError): the_func = self.kb.functions.get_by_addr(self._func_addr) if the_func is not None: bp_as_gpr = the_func.info.get("bp_as_gpr", False) self._engine_vex = SimEngineRDVEX( self.project, functions=self.kb.functions, function_handler=self._function_handler, ) self._engine_ail = SimEngineRDAIL( self.project, function_handler=self._function_handler, stack_pointer_tracker=stack_pointer_tracker, use_callee_saved_regs_at_return=self._use_callee_saved_regs_at_return, bp_as_gpr=bp_as_gpr, ) self._visited_blocks: set[Any] = visited_blocks or set() self.function_calls: dict[CodeLocation, FunctionCallRelationships] = {} self._analyze()
@property def observed_results(self) -> dict[tuple[str, int, int], LiveDefinitions]: return self.model.observed_results @property def all_definitions(self): return self.model.all_definitions @all_definitions.setter def all_definitions(self, v): self.model.all_definitions = v @property def all_uses(self): return self.model.all_uses @property def one_result(self): if not self.observed_results: raise ValueError("No result is available.") if len(self.observed_results) != 1: raise ValueError("More than one results are available.") return next(iter(self.observed_results.values())) @property def dep_graph(self) -> DepGraph: if self._dep_graph is None: raise ValueError( "Cannot access dep_graph if the analysis was not configured to generate one. Try passing " "dep_graph=True to the RDA constructor." ) return self._dep_graph @property def visited_blocks(self): return self._visited_blocks @deprecated(replacement="get_reaching_definitions_by_insn") def get_reaching_definitions(self, ins_addr, op_type): return self.get_reaching_definitions_by_insn(ins_addr, op_type)
[文档] def get_reaching_definitions_by_insn(self, ins_addr, op_type): key = "insn", ins_addr, op_type if key not in self.observed_results: raise KeyError( f"Reaching definitions are not available at observation point {key!s}. " "Did you specify that observation point?" ) return self.observed_results[key]
[文档] def get_reaching_definitions_by_node(self, node_addr, op_type): key = "node", node_addr, op_type if key not in self.observed_results: raise KeyError( f"Reaching definitions are not available at observation point {key!s}. " "Did you specify that observation point?" ) return self.observed_results[key]
[文档] def node_observe( self, node_addr: int, state: ReachingDefinitionsState, op_type: ObservationPointType, node_idx: int | None = None, ) -> None: """ :param node_addr: Address of the node. :param state: The analysis state. :param op_type: Type of the observation point. Must be one of the following: OP_BEFORE, OP_AFTER. :param node_idx: ID of the node. Used in AIL to differentiate blocks with the same address. """ key = None observe = False if self._observe_all: observe = True key: ObservationPoint = ( ("node", node_addr, op_type) if node_idx is None else ("node", (node_addr, node_idx), op_type) ) elif self._observation_points is not None: key: ObservationPoint = ( ("node", node_addr, op_type) if node_idx is None else ("node", (node_addr, node_idx), op_type) ) if key in self._observation_points: observe = True elif self._observe_callback is not None: observe = self._observe_callback("node", addr=node_addr, state=state, op_type=op_type, node_idx=node_idx) if observe: key: ObservationPoint = ( ("node", node_addr, op_type) if node_idx is None else ("node", (node_addr, node_idx), op_type) ) if observe: self.observed_results[key] = state.live_definitions
[文档] def insn_observe( self, insn_addr: int, stmt: ailment.Stmt.Statement | pyvex.stmt.IRStmt, block: Block | ailment.Block, state: ReachingDefinitionsState, op_type: ObservationPointType, ) -> None: """ :param insn_addr: Address of the instruction. :param stmt: The statement. :param block: The current block. :param state: The abstract analysis state. :param op_type: Type of the observation point. Must be one of the following: OP_BEORE, OP_AFTER. """ key = None observe = False if self._observe_all: observe = True key: ObservationPoint = ("insn", insn_addr, op_type) elif self._observation_points is not None: key: ObservationPoint = ("insn", insn_addr, op_type) if key in self._observation_points: observe = True elif self._observe_callback is not None: observe = self._observe_callback( "insn", addr=insn_addr, stmt=stmt, block=block, state=state, op_type=op_type ) if observe: key: ObservationPoint = ("insn", insn_addr, op_type) if not observe: return if isinstance(stmt, pyvex.stmt.IRStmt): # it's an angr block vex_block = block.vex # OP_BEFORE: stmt has to be IMark if op_type == OP_BEFORE and type(stmt) is pyvex.stmt.IMark: self.observed_results[key] = state.live_definitions.copy() # OP_AFTER: stmt has to be last stmt of block or next stmt has to be IMark elif op_type == OP_AFTER: idx = vex_block.statements.index(stmt) if idx == len(vex_block.statements) - 1 or type(vex_block.statements[idx + 1]) is pyvex.IRStmt.IMark: self.observed_results[key] = state.live_definitions.copy() elif isinstance(stmt, ailment.Stmt.Statement): # it's an AIL block self.observed_results[key] = state.live_definitions.copy()
[文档] def stmt_observe( self, stmt_idx: int, stmt: ailment.Stmt.Statement | pyvex.stmt.IRStmt, block: Block | ailment.Block, state: ReachingDefinitionsState, op_type: ObservationPointType, ) -> None: """ :param stmt_idx: :param stmt: :param block: :param state: :param op_type: :return: """ key = None observe = False block_idx = block.idx if isinstance(block, ailment.Block) else None if self._observe_all: observe = True key: ObservationPoint = ("stmt", (block.addr, block_idx, stmt_idx), op_type) elif self._observation_points is not None: key: ObservationPoint = ("stmt", (block.addr, block_idx, stmt_idx), op_type) if key in self._observation_points: observe = True elif self._observe_callback is not None: observe = self._observe_callback( "stmt", stmt_idx=stmt_idx, stmt=stmt, block=block, state=state, op_type=op_type ) if observe: key: ObservationPoint = ("stmt", (block.addr, block_idx, stmt_idx), op_type) if not observe: return if isinstance(stmt, pyvex.stmt.IRStmt): # it's an angr block self.observed_results[key] = state.live_definitions.copy() elif isinstance(stmt, ailment.Stmt.Statement): # it's an AIL block self.observed_results[key] = state.live_definitions.copy()
[文档] def exit_observe( self, node_addr: int, exit_stmt_idx: int, block: Block | ailment.Block, state: ReachingDefinitionsState, node_idx: int | None = None, ): observe = False key = None if self._observe_all: observe = True key = ( ("exit", (node_addr, exit_stmt_idx), ObservationPointType.OP_AFTER) if node_idx is None else ("exit", (node_addr, node_idx, exit_stmt_idx), ObservationPointType.OP_AFTER) ) elif self._observation_points is not None: key = ( ("exit", (node_addr, exit_stmt_idx), ObservationPointType.OP_AFTER) if node_idx is None else ("exit", (node_addr, node_idx, exit_stmt_idx), ObservationPointType.OP_AFTER) ) if key in self._observation_points: observe = True elif self._observe_callback is not None: observe = self._observe_callback( "exit", node_addr=node_addr, exit_stmt_idx=exit_stmt_idx, block=block, state=state, ) if observe: key = ( ("exit", (node_addr, exit_stmt_idx), ObservationPointType.OP_AFTER) if node_idx is None else ("exit", (node_addr, node_idx, exit_stmt_idx), ObservationPointType.OP_AFTER) ) if not observe: return self.observed_results[key] = state.live_definitions.copy()
@property def subject(self): return self._subject # # Main analysis routines # def _pre_analysis(self): pass def _initial_abstract_state(self, node) -> ReachingDefinitionsState: if self._init_state is not None: return self._init_state return ReachingDefinitionsState( CodeLocation(node.addr, stmt_idx=0, ins_addr=node.addr, context=self._init_context), self.project.arch, self.subject, track_tmps=self._track_tmps, track_consts=self._track_consts, analysis=self, canonical_size=self._canonical_size, initializer=self._state_initializer, element_limit=self._element_limit, merge_into_tops=self._merge_into_tops, ) # pylint: disable=no-self-use,arguments-differ def _merge_states(self, _node, *states: ReachingDefinitionsState): assert len(states) >= 2 merged_state, merge_occurred = states[0].merge(*states[1:]) return merged_state, not merge_occurred def _compare_states(self, node, old_state: ReachingDefinitionsState, new_state: ReachingDefinitionsState) -> bool: """ Return True if new_state >= old_state in the lattice. :param node: :param old_state: :param new_state: :return: """ return new_state.compare(old_state) def _run_on_node(self, node, state: ReachingDefinitionsState): """ :param node: The current node. :param state: The analysis state. :return: A tuple: (reached fix-point, successor state) """ self._visited_blocks.add(node) engine: SimEngineLight if isinstance(node, ailment.Block): block = node block_key = (node.addr, node.idx) engine = self._engine_ail elif isinstance(node, (Block, CodeNode)): block = self.project.factory.block(node.addr, node.size, opt_level=1, cross_insn_opt=False) engine = self._engine_vex block_key = node.addr elif isinstance(node, CFGNode): if node.is_simprocedure or node.is_syscall: return False, state.copy(discard_tmpdefs=True) block = node.block engine = self._engine_vex block_key = node.addr else: l.warning("Unsupported node type %s.", node.__class__) return False, state.copy(discard_tmpdefs=True) state = state.copy(discard_tmpdefs=True) self.node_observe(node.addr, state.copy(), OP_BEFORE) if self.subject.type == SubjectType.Function: node_parents = [ CodeLocation(pred.addr, 0, block_idx=pred.idx if isinstance(pred, ailment.Block) else None) for pred in self._graph_visitor.predecessors(node) ] if node.addr == self.subject.content.addr: node_parents += [ExternalCodeLocation()] self.model.at_new_block( CodeLocation(block.addr, 0, block_idx=block.idx if isinstance(block, ailment.Block) else None), node_parents, ) state = engine.process( state, block=block, fail_fast=self._fail_fast, visited_blocks=self._visited_blocks, dep_graph=self._dep_graph, model=self.model, ) self._node_iterations[block_key] += 1 self.node_observe(node.addr, state, OP_AFTER) # update all definitions and all uses self.all_definitions |= state.all_definitions for use in [state.stack_uses, state.heap_uses, state.register_uses, state.memory_uses]: self.all_uses.merge(use) if self._track_tmps: # merge tmp uses to all_uses for tmp_idx, locs in state.tmp_uses.items(): tmp_def = next(iter(state.tmps[tmp_idx])) for loc in locs: self.all_uses.add_use(tmp_def, loc) # drop definitions and uses because we will not need them anymore state.downsize() if self._node_iterations[block_key] < self._max_iterations: return None, state return False, state def _intra_analysis(self): pass def _post_analysis(self): pass
[文档] def callsites_to(self, target: int | str | Function) -> Iterable[FunctionCallRelationships]: if isinstance(target, (str, int)): try: func_addr = self.project.kb.functions[target].addr except KeyError: return elif isinstance(target, Function): func_addr = target.addr else: raise TypeError(type(target)) for info in self.function_calls.values(): if info.target == func_addr: yield info