from__future__importannotationsimportloggingimportrandomfromenumimportauto,IntFlagfromcollections.abcimportGeneratorimportangrfromangr.analysesimportAnalysis,AnalysesHubfromangr.knowledge_pluginsimportFunctionfromangr.sim_stateimportSimStatefromangr.utils.tagged_interval_mapimportTaggedIntervalMaplog=logging.getLogger(__name__)log.setLevel(logging.INFO)classTraceActions(IntFlag):""" Describe memory access actions. """WRITE=auto()EXECUTE=auto()classTraceClassifier:""" Classify traces. """def__init__(self,state:SimState|None=None):self.map=TaggedIntervalMap()ifstate:self.instrument(state)defact_mem_write(self,state)->None:""" SimInspect callback for memory writes. """addr=state.solver.eval(state.inspect.mem_write_address)length=state.inspect.mem_write_lengthifnotisinstance(length,int):length=state.solver.eval(length)self.map.add(addr,length,TraceActions.WRITE)defact_instruction(self,state)->None:""" SimInspect callback for instruction execution. """addr=state.inspect.instructionifaddrisNone:log.warning("Symbolic addr")return# FIXME: Ensure block size is correctself.map.add(addr,state.block().size,TraceActions.EXECUTE)definstrument(self,state)->None:""" Instrument `state` for tracing. """state.inspect.b("mem_write",when=angr.BP_BEFORE,action=self.act_mem_write)state.inspect.b("instruction",when=angr.BP_BEFORE,action=self.act_instruction)defget_smc_address_and_lengths(self)->Generator[tuple[int,int]]:""" Evaluate the trace to find which areas of memory were both written to and executed. """smc_flags=TraceActions.WRITE|TraceActions.EXECUTEforaddr,size,flagsinself.map.irange():if(flags&smc_flags)==smc_flags:yield(addr,size)defdetermine_smc(self)->bool:""" Evaluate the trace to find areas of memory that were both written to and executed. """returnany(self.get_smc_address_and_lengths())defpp(self):fora,b,cinself.map.irange():print(f"{a:8x}{b}{c}")
[文档]classSelfModifyingCodeAnalysis(Analysis):""" Determine if some piece of code is self-modifying. This determination is made by simply executing. If an address is executed that is also written to, the code is determined to be self-modifying. The determination is stored in the `result` property. The `regions` property contains a list of (addr, length) regions that were both written to and executed. """result:boolregions:list[tuple[int,int]]
[文档]def__init__(self,subject:None|int|str|Function,max_bytes:int=0,state:SimState|None=None):""" :param subject: Subject of analysis :param max_bytes: Maximum number of bytes from subject address. 0 for no limit (default). :param state: State to begin executing from from. """assertself.project.selfmodifying_codeifsubjectisNone:subject=self.project.entryifisinstance(subject,str):try:addr=self.project.kb.labels.lookup(subject)exceptKeyError:addr=self.project.kb.functions[subject].addrelifisinstance(subject,Function):addr=subject.addrelifisinstance(subject,int):addr=subjectelse:raiseValueError("Not a supported subject")ifstateisNone:init_state=self.project.factory.call_state(addr)else:init_state=state.copy()init_state.regs.pc=addrinit_state.options-=angr.sim_options.simplificationself._trace_classifier=TraceClassifier(init_state)simgr=self.project.factory.simgr(init_state)kwargs={}ifmax_bytes:kwargs["filter_func"]=lambdas:("active"ifs.solver.eval(addr<=s.regs.pc)ands.solver.eval(s.regs.pc<addr+max_bytes)else"oob")# FIXME: Early out on SMC detect# FIXME: Configurable step threshold# FIXME: Loop analysisforninrange(100):self._update_progress(n)simgr.step(n=3)random.shuffle(simgr.active)simgr.split(from_stash="active",to_stash=simgr.DROP,limit=10)# Classify any out of bound entrypointsforstate_insimgr.stashes["oob"]:self._trace_classifier.act_instruction(state_)self.regions=list(self._trace_classifier.get_smc_address_and_lengths())self.result=len(self.regions)>0