# pylint:disable=unused-import
from __future__ import annotations
import logging
from collections import defaultdict
from typing import Optional, Union, Any, TYPE_CHECKING
from collections.abc import Iterable
import networkx
from cle import SymbolType
import ailment
from angr.analyses.cfg import CFGFast
from angr.knowledge_plugins.functions.function import Function
from angr.knowledge_base import KnowledgeBase
from angr.sim_variable import SimMemoryVariable, SimRegisterVariable, SimStackVariable
from angr.utils import timethis
from angr.analyses import Analysis, AnalysesHub
from .structuring import RecursiveStructurer, PhoenixStructurer, DEFAULT_STRUCTURER
from .region_identifier import RegionIdentifier
from .optimization_passes.optimization_pass import OptimizationPassStage
from .ailgraph_walker import AILGraphWalker
from .condition_processor import ConditionProcessor
from .decompilation_options import DecompilationOption
from .decompilation_cache import DecompilationCache
from .utils import remove_labels
from .sequence_walker import SequenceWalker
from .structuring.structurer_nodes import SequenceNode
from .presets import DECOMPILATION_PRESETS, DecompilationPreset
if TYPE_CHECKING:
from angr.knowledge_plugins.cfg.cfg_model import CFGModel
from .peephole_optimizations import PeepholeOptimizationExprBase, PeepholeOptimizationStmtBase
from .structured_codegen.c import CStructuredCodeGenerator
l = logging.getLogger(name=__name__)
_PEEPHOLE_OPTIMIZATIONS_TYPE = Optional[
Iterable[Union[type["PeepholeOptimizationStmtBase"], type["PeepholeOptimizationExprBase"]]]
]
[文档]
class Decompiler(Analysis):
"""
The decompiler analysis.
Run this on a Function object for which a normalized CFG has been constructed.
The fully processed output can be found in result.codegen.text
"""
[文档]
def __init__(
self,
func: Function | str | int,
cfg: CFGFast | CFGModel | None = None,
options=None,
preset: str | DecompilationPreset | None = None,
optimization_passes=None,
sp_tracker_track_memory=True,
variable_kb=None,
peephole_optimizations: _PEEPHOLE_OPTIMIZATIONS_TYPE = None,
vars_must_struct: set[str] | None = None,
flavor="pseudocode",
expr_comments=None,
stmt_comments=None,
ite_exprs=None,
binop_operators=None,
decompile=True,
regen_clinic=True,
inline_functions=frozenset(),
desired_variables=frozenset(),
update_memory_data: bool = True,
generate_code: bool = True,
use_cache: bool = True,
expr_collapse_depth: int = 16,
):
if not isinstance(func, Function):
func = self.kb.functions[func]
self.func: Function = func
self._cfg = cfg.model if isinstance(cfg, CFGFast) else cfg
self._options = options or []
if preset is None and optimization_passes:
self._optimization_passes = optimization_passes
else:
# we use the preset
if isinstance(preset, str):
if preset not in DECOMPILATION_PRESETS:
raise KeyError(f"Decompilation preset {preset} is not found")
preset = DECOMPILATION_PRESETS[preset]
elif preset is None:
preset = DECOMPILATION_PRESETS["default"]
if not isinstance(preset, DecompilationPreset):
raise TypeError('"preset" must be a DecompilationPreset instance')
self._optimization_passes = preset.get_optimization_passes(self.project.arch, self.project.simos.name)
l.debug("Get %d optimization passes for the current binary.", len(self._optimization_passes))
self._sp_tracker_track_memory = sp_tracker_track_memory
self._peephole_optimizations = peephole_optimizations
self._vars_must_struct = vars_must_struct
self._flavor = flavor
self._variable_kb = variable_kb
self._expr_comments = expr_comments
self._stmt_comments = stmt_comments
self._ite_exprs = ite_exprs
self._binop_operators = binop_operators
self._regen_clinic = regen_clinic
self._update_memory_data = update_memory_data
self._generate_code = generate_code
self._inline_functions = inline_functions
self._desired_variables = desired_variables
self._cache_parameters = (
{
"cfg": self._cfg,
"variable_kb": self._variable_kb,
"options": {(o, v) for o, v in self._options if o.category != "Display" and v != o.default_value},
"optimization_passes": self._optimization_passes,
"sp_tracker_track_memory": self._sp_tracker_track_memory,
"peephole_optimizations": self._peephole_optimizations,
"vars_must_struct": self._vars_must_struct,
"flavor": self._flavor,
"expr_comments": self._expr_comments,
"stmt_comments": self._stmt_comments,
"ite_exprs": self._ite_exprs,
"binop_operators": self._binop_operators,
"inline_functions": self._inline_functions,
"desired_variables": self._desired_variables,
}
if use_cache
else None
)
self.clinic = None # mostly for debugging purposes
self.codegen: CStructuredCodeGenerator | None = None
self.cache: DecompilationCache | None = None
self.options_by_class = None
self.seq_node: SequenceNode | None = None
self.unoptimized_ail_graph: networkx.DiGraph | None = None
self.ail_graph: networkx.DiGraph | None = None
self.vvar_id_start = None
self._optimization_scratch: dict[str, Any] = {}
self.expr_collapse_depth = expr_collapse_depth
if decompile:
with self._resilience():
self._decompile()
if self.errors:
if (self.func.addr, self._flavor) not in self.kb.decompilations:
self.kb.decompilations[(self.func.addr, self._flavor)] = DecompilationCache(self.func.addr)
for error in self.errors:
self.kb.decompilations[(self.func.addr, self._flavor)].errors.append(error.format())
with self._resilience():
l.info("Decompilation failed for %s. Switching to basic preset and trying again.")
if preset != DECOMPILATION_PRESETS["basic"]:
self._optimization_passes = DECOMPILATION_PRESETS["basic"].get_optimization_passes(
self.project.arch, self.project.simos.name
)
self._decompile()
for error in self.errors:
self.kb.decompilations[(self.func.addr, self._flavor)].errors.append(error.format())
def _can_use_decompilation_cache(self, cache: DecompilationCache) -> bool:
a, b = self._cache_parameters, cache.parameters
id_checks = {"cfg", "variable_kb"}
return all(a[k] is b[k] if k in id_checks else a[k] == b[k] for k in self._cache_parameters)
@timethis
def _decompile(self):
if self.func.is_simprocedure:
return
cache = None
if self._cache_parameters is not None:
try:
cache = self.kb.decompilations[(self.func.addr, self._flavor)]
if not self._can_use_decompilation_cache(cache):
cache = None
except KeyError:
pass
if cache:
old_codegen = cache.codegen
old_clinic = cache.clinic
ite_exprs = cache.ite_exprs if self._ite_exprs is None else self._ite_exprs
binop_operators = cache.binop_operators if self._binop_operators is None else self._binop_operators
l.debug("Decompilation cache hit")
else:
old_codegen = None
old_clinic = None
ite_exprs = self._ite_exprs
binop_operators = self._binop_operators
l.debug("Decompilation cache miss")
self.options_by_class = defaultdict(list)
if self._options:
for o, v in self._options:
self.options_by_class[o.cls].append((o, v))
# set global variables
self._set_global_variables()
self._update_progress(5.0, text="Converting to AIL")
variable_kb = self._variable_kb
# fall back to old codegen
if variable_kb is None and old_codegen is not None:
variable_kb = old_codegen._variable_kb
if variable_kb is None:
reset_variable_names = True
else:
reset_variable_names = self.func.addr not in variable_kb.variables.function_managers
# determine a few arguments according to the structuring algorithm
fold_callexprs_into_conditions = False
self._force_loop_single_exit = True
self._complete_successors = False
self._recursive_structurer_params = self.options_to_params(self.options_by_class["recursive_structurer"])
if "structurer_cls" not in self._recursive_structurer_params:
self._recursive_structurer_params["structurer_cls"] = DEFAULT_STRUCTURER
# is the algorithm based on Phoenix (a schema-based algorithm)?
if issubclass(self._recursive_structurer_params["structurer_cls"], PhoenixStructurer):
self._force_loop_single_exit = False
self._complete_successors = True
fold_callexprs_into_conditions = True
cache = DecompilationCache(self.func.addr)
cache.parameters = self._cache_parameters
cache.ite_exprs = ite_exprs
cache.binop_operators = binop_operators
# convert function blocks to AIL blocks
def progress_callback(p, **kwargs):
return self._update_progress(p * (70 - 5) / 100.0 + 5, **kwargs)
if self._regen_clinic or old_clinic is None or self.func.prototype is None:
clinic = self.project.analyses.Clinic(
self.func,
kb=self.kb,
fail_fast=self._fail_fast,
variable_kb=variable_kb,
reset_variable_names=reset_variable_names,
optimization_passes=self._optimization_passes,
sp_tracker_track_memory=self._sp_tracker_track_memory,
fold_callexprs_into_conditions=fold_callexprs_into_conditions,
cfg=self._cfg,
peephole_optimizations=self._peephole_optimizations,
must_struct=self._vars_must_struct,
cache=cache,
progress_callback=progress_callback,
inline_functions=self._inline_functions,
desired_variables=self._desired_variables,
optimization_scratch=self._optimization_scratch,
force_loop_single_exit=self._force_loop_single_exit,
complete_successors=self._complete_successors,
**self.options_to_params(self.options_by_class["clinic"]),
)
else:
clinic = old_clinic
# reuse the old, unaltered graph
clinic.graph = clinic.cc_graph
clinic.cc_graph = clinic.copy_graph()
self.clinic = clinic
self.cache = cache
self._variable_kb = clinic.variable_kb
self._update_progress(70.0, text="Identifying regions")
self.vvar_id_start = clinic.vvar_id_start
if clinic.graph is None:
# the function is empty
return
# expose a copy of the graph before any optimizations that may change the graph occur;
# use this graph if you need a reference of exact mapping of instructions to AIL statements
self.unoptimized_ail_graph = (
clinic.unoptimized_graph if clinic.unoptimized_graph is not None else clinic.copy_graph()
)
cond_proc = ConditionProcessor(self.project.arch)
clinic.graph = self._run_graph_simplification_passes(
clinic.graph,
clinic.reaching_definitions,
ite_exprs=ite_exprs,
)
# recover regions, delay updating when we have optimizations that may update regions themselves
delay_graph_updates = any(
pass_.STAGE == OptimizationPassStage.DURING_REGION_IDENTIFICATION for pass_ in self._optimization_passes
)
ri = self._recover_regions(clinic.graph, cond_proc, update_graph=not delay_graph_updates)
# run optimizations that may require re-RegionIdentification
clinic.graph, ri = self._run_region_simplification_passes(
clinic.graph,
ri,
clinic.reaching_definitions,
ite_exprs=ite_exprs,
)
# Rewrite the graph to remove phi expressions
# this is probably optional if we do not pretty-print clinic.graph
clinic.graph = self._transform_graph_from_ssa(clinic.graph)
# save the graph before structuring happens (for AIL view)
clinic.cc_graph = remove_labels(clinic.copy_graph())
codegen = None
seq_node = None
# in the event that the decompiler is used without code generation as the target, we should avoid all
# heavy analysis that is used only for the purpose of code generation
if self._generate_code:
self._update_progress(75.0, text="Structuring code")
# structure it
rs = self.project.analyses[RecursiveStructurer].prep(kb=self.kb, fail_fast=self._fail_fast)(
ri.region,
cond_proc=cond_proc,
func=self.func,
**self._recursive_structurer_params,
)
self._update_progress(80.0, text="Simplifying regions")
# simplify it
s = self.project.analyses.RegionSimplifier(
self.func,
rs.result,
kb=self.kb,
fail_fast=self._fail_fast,
variable_kb=clinic.variable_kb,
**self.options_to_params(self.options_by_class["region_simplifier"]),
)
seq_node = s.result
seq_node = self._run_post_structuring_simplification_passes(
seq_node, binop_operators=cache.binop_operators, goto_manager=s.goto_manager, graph=clinic.graph
)
# rewrite the sequence node to remove phi expressions
seq_node = self._transform_seqnode_from_ssa(seq_node)
# update memory data
if self._cfg is not None and self._update_memory_data:
self.find_data_references_and_update_memory_data(seq_node)
self._update_progress(85.0, text="Generating code")
codegen = self.project.analyses.StructuredCodeGenerator(
self.func,
seq_node,
cfg=self._cfg,
ail_graph=clinic.graph,
flavor=self._flavor,
func_args=clinic.arg_list,
kb=self.kb,
fail_fast=self._fail_fast,
variable_kb=clinic.variable_kb,
expr_comments=old_codegen.expr_comments if old_codegen is not None else None,
stmt_comments=old_codegen.stmt_comments if old_codegen is not None else None,
const_formats=old_codegen.const_formats if old_codegen is not None else None,
externs=clinic.externs,
binop_depth_cutoff=self.expr_collapse_depth,
**self.options_to_params(self.options_by_class["codegen"]),
)
self._update_progress(90.0, text="Finishing up")
self.seq_node = seq_node
self.codegen = codegen
# save a copy of the AIL graph that is optimized but not modified by region identification
self.ail_graph = clinic.cc_graph
self.cache.codegen = codegen
self.cache.clinic = self.clinic
self.kb.decompilations[(self.func.addr, self._flavor)] = self.cache
self._finish_progress()
def _recover_regions(self, graph: networkx.DiGraph, condition_processor, update_graph: bool = True):
return self.project.analyses[RegionIdentifier].prep(kb=self.kb, fail_fast=self._fail_fast)(
self.func,
graph=graph,
cond_proc=condition_processor,
update_graph=update_graph,
force_loop_single_exit=self._force_loop_single_exit,
complete_successors=self._complete_successors,
entry_node_addr=self.clinic.entry_node_addr,
**self.options_to_params(self.options_by_class["region_identifier"]),
)
@timethis
def _run_graph_simplification_passes(self, ail_graph, reaching_definitions, **kwargs):
"""
Runs optimizations that should be executed before region identification.
:param ail_graph: DiGraph with AIL Statements
:param reaching_definitions: ReachingDefinitionAnalysis
:return: The possibly new AIL DiGraph and RegionIdentifier
"""
addr_and_idx_to_blocks: dict[tuple[int, int | None], ailment.Block] = {}
addr_to_blocks: dict[int, set[ailment.Block]] = defaultdict(set)
# update blocks_map to allow node_addr to node lookup
def _updatedict_handler(node):
addr_and_idx_to_blocks[(node.addr, node.idx)] = node
addr_to_blocks[node.addr].add(node)
AILGraphWalker(ail_graph, _updatedict_handler).walk()
# run each pass
for pass_ in self._optimization_passes:
# only for post region id opts
if pass_.STAGE != OptimizationPassStage.BEFORE_REGION_IDENTIFICATION:
continue
if pass_.STRUCTURING and self._recursive_structurer_params["structurer_cls"].NAME not in pass_.STRUCTURING:
l.warning(
"Skipping %s because it does not support structuring algorithm: %s",
pass_,
self._recursive_structurer_params["structurer_cls"].NAME,
)
continue
pass_ = timethis(pass_)
a = pass_(
self.func,
blocks_by_addr=addr_to_blocks,
blocks_by_addr_and_idx=addr_and_idx_to_blocks,
graph=ail_graph,
variable_kb=self._variable_kb,
reaching_definitions=reaching_definitions,
entry_node_addr=self.clinic.entry_node_addr,
scratch=self._optimization_scratch,
force_loop_single_exit=self._force_loop_single_exit,
complete_successors=self._complete_successors,
**kwargs,
)
# should be None if no changes
if a.out_graph:
# use the new graph
ail_graph = a.out_graph
return ail_graph
@timethis
def _run_region_simplification_passes(self, ail_graph, ri, reaching_definitions, **kwargs):
"""
Runs optimizations that should be executed after a single region identification. This function will return
two items: the new RegionIdentifier object and the new AIL Graph, which should probably be written
back to the clinic object that the graph is from.
Note: After each optimization run, if the optimization modifies the graph in any way then RegionIdentification
will be run again.
:param ail_graph: DiGraph with AIL Statements
:param ri: RegionIdentifier
:param reaching_definitions: ReachingDefinitionAnalysis
:return: The possibly new AIL DiGraph and RegionIdentifier
"""
addr_and_idx_to_blocks: dict[tuple[int, int | None], ailment.Block] = {}
addr_to_blocks: dict[int, set[ailment.Block]] = defaultdict(set)
# update blocks_map to allow node_addr to node lookup
def _updatedict_handler(node):
addr_and_idx_to_blocks[(node.addr, node.idx)] = node
addr_to_blocks[node.addr].add(node)
AILGraphWalker(ail_graph, _updatedict_handler).walk()
# run each pass
for pass_ in self._optimization_passes:
# only for post region id opts
if pass_.STAGE != OptimizationPassStage.DURING_REGION_IDENTIFICATION:
continue
if pass_.STRUCTURING and self._recursive_structurer_params["structurer_cls"].NAME not in pass_.STRUCTURING:
l.warning(
"Skipping %s because it does not support structuring algorithm: %s",
pass_,
self._recursive_structurer_params["structurer_cls"].NAME,
)
continue
pass_ = timethis(pass_)
a = pass_(
self.func,
blocks_by_addr=addr_to_blocks,
blocks_by_addr_and_idx=addr_and_idx_to_blocks,
graph=ail_graph,
variable_kb=self._variable_kb,
region_identifier=ri,
reaching_definitions=reaching_definitions,
vvar_id_start=self.vvar_id_start,
entry_node_addr=self.clinic.entry_node_addr,
scratch=self._optimization_scratch,
force_loop_single_exit=self._force_loop_single_exit,
complete_successors=self._complete_successors,
**kwargs,
)
# should be None if no changes
if a.out_graph:
# use the new graph
ail_graph = a.out_graph
# the graph might change! update them.
addr_and_idx_to_blocks = {}
addr_to_blocks = defaultdict(set)
AILGraphWalker(ail_graph, _updatedict_handler).walk()
cond_proc = ConditionProcessor(self.project.arch)
# always update RI on graph change
ri = self._recover_regions(ail_graph, cond_proc, update_graph=False)
self.vvar_id_start = a.vvar_id_start
return ail_graph, self._recover_regions(ail_graph, ConditionProcessor(self.project.arch), update_graph=True)
@timethis
def _run_post_structuring_simplification_passes(self, seq_node, **kwargs):
for pass_ in self._optimization_passes:
if pass_.STAGE != OptimizationPassStage.AFTER_STRUCTURING:
continue
pass_ = timethis(pass_)
a = pass_(self.func, seq=seq_node, scratch=self._optimization_scratch, **kwargs)
if a.out_seq:
seq_node = a.out_seq
return seq_node
def _set_global_variables(self):
global_variables = self.kb.variables["global"]
for symbol in self.project.loader.main_object.symbols:
if symbol.type == SymbolType.TYPE_OBJECT:
ident = global_variables.next_variable_ident("global")
global_variables.set_variable(
"global",
symbol.rebased_addr,
SimMemoryVariable(symbol.rebased_addr, 1, name=symbol.name, ident=ident),
)
[文档]
def reflow_variable_types(self, type_constraints: set, func_typevar, var_to_typevar: dict, codegen):
"""
Re-run type inference on an existing variable recovery result, then rerun codegen to generate new results.
:return:
"""
var_kb = self._variable_kb if self._variable_kb is not None else KnowledgeBase(self.project)
if self.func.addr not in var_kb.variables:
# for some reason variables for the current function don't really exist...
groundtruth = {}
else:
var_manager = var_kb.variables[self.func.addr]
# ground-truth types
groundtruth = {}
for variable in var_manager.variables_with_manual_types:
vartype = var_manager.variable_to_types.get(variable, None)
if vartype is not None:
for typevar in var_to_typevar[variable]:
groundtruth[typevar] = vartype
# variables that must be interpreted as structs
if self._vars_must_struct:
must_struct = set()
for var, typevars in var_to_typevar.items():
for typevar in typevars:
if var.ident in self._vars_must_struct:
must_struct.add(typevar)
else:
must_struct = None
# Type inference
try:
tp = self.project.analyses.Typehoon(
type_constraints,
func_typevar,
kb=var_kb,
fail_fast=self._fail_fast,
var_mapping=var_to_typevar,
must_struct=must_struct,
ground_truth=groundtruth,
)
tp.update_variable_types(
self.func.addr,
{v: t for v, t in var_to_typevar.items() if isinstance(v, (SimRegisterVariable, SimStackVariable))},
)
tp.update_variable_types(
"global",
{v: t for v, t in var_to_typevar.items() if isinstance(v, (SimRegisterVariable, SimStackVariable))},
)
# update the function prototype if needed
if self.func.prototype is not None and self.func.prototype.args:
var_manager = var_kb.variables[self.func.addr]
for i, arg in enumerate(codegen.cfunc.arg_list):
if i >= len(self.func.prototype.args):
break
var = arg.variable
new_type = var_manager.get_variable_type(var)
if new_type is not None:
self.func.prototype.args[i] = new_type
except Exception: # pylint:disable=broad-except
l.warning(
"Typehoon analysis failed. Variables will not have types. Please report to GitHub.", exc_info=True
)
codegen.reload_variable_types()
return codegen
[文档]
def find_data_references_and_update_memory_data(self, seq_node: SequenceNode):
const_values: set[int] = set()
def _handle_Const(expr_idx: int, expr: ailment.Expr.Const, *args, **kwargs): # pylint:disable=unused-argument
const_values.add(expr.value)
def _handle_block(block: ailment.Block, **kwargs): # pylint:disable=unused-argument
block_walker = ailment.AILBlockWalkerBase(
expr_handlers={
ailment.Expr.Const: _handle_Const,
}
)
block_walker.walk(block)
seq_walker = SequenceWalker(
handlers={
ailment.Block: _handle_block,
},
update_seqnode_in_place=False,
)
seq_walker.walk(seq_node)
added_memory_data_addrs = []
for data_addr in const_values:
if data_addr in self._cfg.memory_data:
continue
if not self.project.loader.find_loadable_containing(data_addr):
continue
if self._cfg.add_memory_data(data_addr, None):
added_memory_data_addrs.append(data_addr)
self._cfg.tidy_data_references(
memory_data_addrs=added_memory_data_addrs,
)
def _transform_graph_from_ssa(self, ail_graph: networkx.DiGraph) -> networkx.DiGraph:
dephication = self.project.analyses.GraphDephication(
self.func, ail_graph, rewrite=True, kb=self.kb, fail_fast=self._fail_fast
)
return dephication.output
def _transform_seqnode_from_ssa(self, seq_node: SequenceNode) -> SequenceNode:
dephication = self.project.analyses.SeqNodeDephication(
self.func, seq_node, rewrite=True, kb=self.kb, fail_fast=self._fail_fast
)
return dephication.output
[文档]
@staticmethod
def options_to_params(options: list[tuple[DecompilationOption, Any]]) -> dict[str, Any]:
"""
Convert decompilation options to a dict of params.
:param options: The decompilation options.
:return: A dict of keyword arguments.
"""
d = {}
for option, value in options:
if option.convert is not None:
d[option.param] = option.convert(value)
else:
d[option.param] = value
return d
AnalysesHub.register_default("Decompiler", Decompiler)