From 3471a1bf6cca2c79bf1d7b65614c868d57a4a41b Mon Sep 17 00:00:00 2001 From: Christian Heitman Date: Wed, 17 Jul 2024 16:50:46 -0300 Subject: [PATCH] Refactor pastistritondse --- engines/pastistritondse/__main__.py | 23 +- engines/pastistritondse/alert.py | 150 ++++ engines/pastistritondse/driver.py | 1267 +++++++++++++-------------- engines/pastistritondse/replayer.py | 102 +++ engines/pastistritondse/utils.py | 31 + 5 files changed, 927 insertions(+), 646 deletions(-) create mode 100644 engines/pastistritondse/alert.py create mode 100644 engines/pastistritondse/replayer.py create mode 100644 engines/pastistritondse/utils.py diff --git a/engines/pastistritondse/__main__.py b/engines/pastistritondse/__main__.py index 88ae38c..a9dc402 100755 --- a/engines/pastistritondse/__main__.py +++ b/engines/pastistritondse/__main__.py @@ -84,7 +84,13 @@ def online(host: str, port: int, debug: bool, probe: Tuple[str]): pastis.add_probe(probe) pastis.init_agent(host, port) - pastis.run(online=True) + + try: + logging.info(f'Starting fuzzer...') + pastis.run(online=True) + except KeyboardInterrupt: + logging.info(f'Stopping fuzzer... (Ctrl+C)') + pastis.stop() @cli.command(context_settings=dict(show_default=True)) @@ -218,20 +224,23 @@ def offline(program: str, report) # Set the number of execution limit - pastis.config.exploration_limit = count + pastis._config.exploration_limit = count # Provide it all our seeds for s in seed: s_path = Path(s) if s_path.is_file(): # Add the seed file - pastis.seed_received(SeedType.INPUT, Path(s).read_bytes()) + pastis.add_initial_seed(s_path) elif s_path.is_dir(): # Add all file contained in the directory as seeds for sub_s in s_path.iterdir(): - pastis.seed_received(SeedType.INPUT, sub_s.read_bytes()) - - # Call run to start exploration - pastis.run(online=False, debug_pp=debug_pp) + pastis.add_initial_seed(sub_s) + try: + logging.info(f'Starting fuzzer...') + pastis.run(online=False, debug_pp=debug_pp) + except KeyboardInterrupt: + logging.info(f'Stopping fuzzer... (Ctrl+C)') + pastis.stop() def main(): cli() diff --git a/engines/pastistritondse/alert.py b/engines/pastistritondse/alert.py new file mode 100644 index 0000000..71068ab --- /dev/null +++ b/engines/pastistritondse/alert.py @@ -0,0 +1,150 @@ +import logging + +from triton import CPUSIZE, MemoryAccess +from tritondse import Addr, FormatStringSanitizer, IntegerOverflowSanitizer, NullDerefSanitizer, ProcessState, \ + SymbolicExecutor, \ + UAFSanitizer +from tritondse.sanitizers import mk_new_crashing_seed + + +class AlertValidator(object): + + @staticmethod + def validate(typ: str, se: SymbolicExecutor, pstate: ProcessState, addr: Addr) -> bool: + """ + This function is called by intrinsic_callback in order to verify defaults + and vulnerabilities. + + :param typ: Type of the alert as a string + :param se: The current symbolic executor + :param pstate: The current process state of the execution + :param addr: The instruction address of the intrinsic call + :return: True if a vulnerability has been verified + """ + # BUFFER_OVERFLOW related alerts + if typ == "SV_STRBO_UNBOUND_COPY": + size = se.pstate.get_argument_value(2) + ptr = se.pstate.get_argument_value(3) + + # Runtime check + if len(se.pstate.memory.read_string(ptr)) >= size: + # FIXME: Do we have to define the seed as CRASH even if there is no crash? + # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever + return True + + # Symbolic check + actx = se.pstate.actx + predicate = [se.pstate.tt_ctx.getPathPredicate()] + + # For each memory cell, try to proof that they can be different from \0 + for i in range(size + 1): # +1 in order to proof that we can at least do an off-by-one + cell = se.pstate.tt_ctx.getMemoryAst(MemoryAccess(ptr + i, CPUSIZE.BYTE)) + predicate.append(cell != 0) + + # FIXME: Maybe we can generate models until unsat in order to find the bigger string + + model = se.pstate.tt_ctx.getModel(actx.land(predicate)) + if model: + crash_seed = mk_new_crashing_seed(se, model) + se.workspace.save_seed(crash_seed) + logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') + return True + + return False + + ###################################################################### + + # BUFFER_OVERFLOW related alerts + elif typ == "SV_STRBO_BOUND_COPY_OVERFLOW": + dst_size = se.pstate.get_argument_value(2) + ptr_inpt = se.pstate.get_argument_value(3) + max_size = se.pstate.get_argument_value(4) + + # Runtime check + if max_size >= dst_size and len(se.pstate.memory.read_string(ptr_inpt)) >= dst_size: + # FIXME: Do we have to define the seed as CRASH even if there is no crash? + # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever + return True + + # Symbolic check + actx = se.pstate.actx + max_size_s = se.pstate.get_argument_symbolic(4).getAst() + predicate = [se.pstate.tt_ctx.getPathPredicate(), max_size_s >= dst_size] + + # For each memory cell, try to proof that they can be different from \0 + for i in range(dst_size + 1): # +1 in order to proof that we can at least do an off-by-one + cell = se.pstate.tt_ctx.getMemoryAst(MemoryAccess(ptr_inpt + i, CPUSIZE.BYTE)) + predicate.append(cell != 0) + + # FIXME: Maybe we can generate models until unsat in order to find the bigger string + + model = se.pstate.tt_ctx.getModel(actx.land(predicate)) + if model: + crash_seed = mk_new_crashing_seed(se, model) + se.workspace.save_seed(crash_seed) + logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') + return True + + return False + + ###################################################################### + + # BUFFER_OVERFLOW related alerts + elif typ == "ABV_GENERAL": + logging.warning(f'ABV_GENERAL encounter but can not check the issue. This issue will be handle if the program will crash.') + return False + + ###################################################################### + + # All INTEGER_OVERFLOW related alerts + elif typ == "NUM_OVERFLOW": + return IntegerOverflowSanitizer.check(se, pstate, pstate.current_instruction) + + ###################################################################### + + # All USE_AFTER_FREE related alerts + elif typ in ["UFM_DEREF_MIGHT", "UFM_FFM_MUST", "UFM_FFM_MIGHT"]: + ptr = se.pstate.get_argument_value(2) + return UAFSanitizer.check(se, pstate, ptr, f'UAF detected at {ptr:#x}') + + ###################################################################### + + # All FORMAT_STRING related alerts + elif typ in ["SV_TAINTED_FMTSTR", "SV_FMTSTR_GENERIC"]: + ptr = se.pstate.get_argument_value(2) + return FormatStringSanitizer.check(se, pstate, addr, ("", ptr)) + + ###################################################################### + + # All INVALID_MEMORY related alerts + # FIXME: NPD_CHECK_MIGHT and NPD_CONST_CALL are not supported by klocwork-alert-inserter + elif typ in ["NPD_FUNC_MUST", "NPD_FUNC_MIGHT", "NPD_CHECK_MIGHT", "NPD_CONST_CALL"]: + ptr = se.pstate.get_argument_value(2) + return NullDerefSanitizer.check(se, pstate, ptr, f'Invalid memory access at {ptr:#x}') + + ###################################################################### + + elif typ == "MISRA_ETYPE_CATEGORY_DIFFERENT_2012": + expr = se.pstate.get_argument_symbolic(2).getAst() + + # Runtime check + if expr.isSigned(): + # FIXME: Do we have to define the seed as CRASH even if there is no crash? + # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever + return True + + # Symbolic check + actx = se.pstate.tt_ctx.getAstContext() + size = expr.getBitvectorSize() - 1 + predicate = [se.pstate.tt_ctx.getPathPredicate(), actx.extract(size - 1, size - 1, expr) == 1] + + model = se.pstate.tt_ctx.getModel(actx.land(predicate)) + if model: + crash_seed = mk_new_crashing_seed(se, model) + se.workspace.save_seed(crash_seed) + logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') + return True + return False + + else: + logging.error(f"Unsupported alert kind {typ}") diff --git a/engines/pastistritondse/driver.py b/engines/pastistritondse/driver.py index 5d340a4..cdfd333 100644 --- a/engines/pastistritondse/driver.py +++ b/engines/pastistritondse/driver.py @@ -1,301 +1,281 @@ # built-in imports -from typing import List, Tuple -import os -import time +import json import logging +import threading +import time +from queue import Queue from hashlib import md5 from pathlib import Path -import threading -import platform -import json -import queue +from typing import List, Optional, Union + +import pastistritondse + +from pastistritondse.alert import AlertValidator +from pastistritondse.replayer import Replayer +from pastistritondse.utils import is_compatible_with_local # third-party imports -from triton import MemoryAccess, CPUSIZE -# Pastis & triton imports -import pastistritondse -from tritondse import Config, Program, CleLoader, CoverageStrategy, SymbolicExplorator, \ - SymbolicExecutor, ProcessState, ExplorationStatus, SeedStatus, ProbeInterface, \ - Workspace, Seed, CompositeData, SeedFormat, QuokkaProgram -from tritondse.sanitizers import FormatStringSanitizer, NullDerefSanitizer, UAFSanitizer, IntegerOverflowSanitizer, mk_new_crashing_seed -from tritondse.types import Addr, Edge, SymExType, Architecture, Platform -from libpastis import ClientAgent, BinaryPackage, SASTReport -from libpastis.types import SeedType, FuzzingEngineInfo, ExecMode, CoverageMode, SeedInjectLoc, CheckMode, LogLevel, AlertData, FuzzMode -from tritondse.trace import QBDITrace, TraceException +# pastis & tritondse imports +from tritondse.callbacks import ProbeInterface +from tritondse.config import Config +from tritondse.coverage import CoverageSingleRun, CoverageStrategy +from tritondse.loaders import CleLoader, Program, QuokkaProgram +from tritondse.process_state import ProcessState +from tritondse.sanitizers import FormatStringSanitizer, NullDerefSanitizer, UAFSanitizer +from tritondse.seed import CompositeData, Seed, SeedFormat, SeedStatus from tritondse.seed_scheduler import FreshSeedPrioritizerWorklist, WorklistAddressToSet +from tritondse.symbolic_executor import SymbolicExecutor +from tritondse.symbolic_explorator import ExplorationStatus, SymbolicExplorator +from tritondse.types import Addr, AstNode, Edge, SymExType +from tritondse.workspace import Workspace -def to_h(seed: Seed) -> str: - if seed.is_composite(): - if TritonDSEDriver.INPUT_FILE_NAME in seed.content.files: - return md5(seed.content.files[TritonDSEDriver.INPUT_FILE_NAME]).hexdigest() - elif "stdin" in seed.content.files: - return md5(seed.content.files["stdin"]).hexdigest() - else: - raise NameError("can't find main payload in Seed") - else: - return md5(seed.content).hexdigest() +from libpastis import BinaryPackage, ClientAgent, SASTReport +from libpastis.types import AlertData, CheckMode, CoverageMode, ExecMode, FuzzMode, FuzzingEngineInfo, LogLevel, \ + SeedInjectLoc, SeedType class TritonDSEDriver(object): INPUT_FILE_NAME = "input_file" - STAT_FILE = "pastidse-stats.json" + STAT_FILE = "tritondse-stats.json" - RAMDISK = "/mnt/ramdisk" - TMP_SEED = "seed.seed" - TMP_TRACE = "result.trace" + DEFAULT_WS_PATH = "/tmp/tritondse_workspace" def __init__(self, agent: ClientAgent): - self.agent = agent - self._init_callbacks() # register callbacks on the given agent - - self.config = Config() - self.config.workspace = "" # Reset workspace so that it will computed in start_received - self.dse = None - self.program = None - self._stop = False - self.sast_report= None - self._last_id = None - self._last_id_pc = None - self._seed_received = set() + + # Internal objects + self._agent = agent + + self._workspace = None + + # Parameters received through start_received + self._check_mode = None + self._report = None + self._seed_inj = None + + self.__setup_agent() + + # Runtime data + self._seed_recvs = set() # Seeds received from the broker. + self._seed_recvs_queue = Queue() # Seeds received from the broker that are pending to process (that is, + # to be directly added to the DSE or replayed and added). + # --- + + self._config = None + self._dse = None + self._last_alert_id = None + self._last_alert_id_pc = None # Last ID previous program counter. self._probes = [] - self._chkmode = None - self._seedloc = None + self._program = None self._program_slice = None + self._seeds_sent_count = 0 # Number of seed sent to the broker. + self._stop = False self._tracing_enabled = False - # local attributes for telemetry - self.nb_to, self.nb_crash = 0, 0 + # Local attributes for telemetry + self._crash_count = 0 self._cur_cov_count = 0 - self._last_cov_update = time.time() - self._seed_queue = queue.Queue() - self._sending_count = 0 - self.seeds_merged = 0 - self.seeds_rejected = 0 + self._last_cov_update = 0 + self._seeds_merged = 0 # Number of seeds merged into the coverage after replaying them. + self._seeds_rejected = 0 # Number of seeds discarded after replaying them. + self._timeout_count = 0 # Timing stats self._start_time = 0 - self._replay_acc = 0 - - self.replay_trace_file, self.replay_seed_file = self._initialize_tmp_files() - - def _initialize_tmp_files(self) -> Tuple[Path, Path]: - ramdisk = Path(self.RAMDISK) - pid = os.getpid() - if ramdisk.exists(): # there is a ramdisk available - dir = ramdisk / f"triton_{pid}" - dir.mkdir() - logging.info(f"tmp directory set to: {dir}") - return dir / self.TMP_TRACE, dir / self.TMP_SEED - else: - logging.info(f"tmp directory set to: /tmp") - return Path(f"/tmp/triton_{pid}.trace"), Path(f"/tmp/triton_{pid}.seed") + self._replay_time_acc = 0 - def add_probe(self, probe: ProbeInterface): - self._probes.append(probe) + # Misc + self._replayer = None + self._alert_validator = None + self._thread = None - def _init_callbacks(self): - self.agent.register_seed_callback(self.seed_received) - self.agent.register_stop_callback(self.stop_received) + def start(self): + self._thread = threading.Thread(target=self.run, daemon=True) + self._thread.start() - def init_agent(self, remote: str = "localhost", port: int = 5555): - self.agent.register_start_callback(self.start_received) # register start because launched manually - self.agent.connect(remote, port) - self.agent.start() - self.agent.send_hello([FuzzingEngineInfo("TRITON", pastistritondse.__version__, "pastistritondse.addon")]) + def stop(self): + if self._dse: + self._dse.stop_exploration() - def start(self): - self._th = threading.Thread(target=self.run, daemon=True) - self._th.start() + self.__save_stats() + + self._stop = True def reset(self): """ Reset the current DSE to be able to restart from fresh settings """ - self.dse = None # remove DSE object - self.config = Config() - self.config.workspace = "" # Reset workspace so that it will computed in start_received - self._last_id_pc = None - self._last_id = None - self.sast_report = None + self._check_mode = None + self._report = None + self._seed_inj = None + + self._seed_recvs = set() + + self._config = None + self._dse = None + self._last_alert_id = None + self._last_alert_id_pc = None + self._program = None self._program_slice = None - self._seed_received = set() - self.program = None + self._seeds_sent_count = 0 self._stop = False - self._chkmode = None - self._seedloc = None - self.nb_to, self.nb_crash = 0, 0 - self._cur_cov_count = 0 - self._last_cov_update = time.time() self._tracing_enabled = False - self._sending_count = 0 - self.seeds_merged = 0 - self.seeds_rejected = 0 + + # local attributes for telemetry + self._crash_count = 0 + self._cur_cov_count = 0 + self._last_cov_update = 0 + self._seeds_merged = 0 + self._seeds_rejected = 0 + self._timeout_count = 0 + + # Timing stats + self._replay_time_acc = 0 self._start_time = 0 - self._replay_acc = 0 + logging.info("DSE Ready") - def run(self, online: bool, debug_pp: bool=False): + @property + def started(self): + return self._dse is not None + + def add_initial_seed(self, file: Union[str, Path]): + p = Path(file) + logging.info(f"Add initial seed {p.name}") + self.__add_seed(p.read_bytes()) + + def add_probe(self, probe: ProbeInterface): + self._probes.append(probe) + + def init_agent(self, remote: str = "localhost", port: int = 5555): + self._agent.register_start_callback(self.start_received) + self._agent.connect(remote, port) + self._agent.start() + self._agent.send_hello([FuzzingEngineInfo("TRITON", pastistritondse.__version__, "pastistritondse.addon")]) + + def run(self, online: bool = True, debug_pp: bool = False): + if online: + self.__run_online(debug_pp=debug_pp) + else: + self.__run_offline(debug_pp=debug_pp) + + def __run_online(self, debug_pp: bool = False): + def cb_debug(se: SymbolicExecutor, _: ProcessState): + se.debug_pp = True # Run while we are not instructed to stop while not self._stop: - - if online: # in offline start_received, seed_received will already have been called - self.reset() + self.reset() # Just wait until the broker says let's go - while self.dse is None: + while self._dse is None: time.sleep(0.10) if debug_pp: - def cb_debug(se, _): - se.debug_pp = True - self.dse.callback_manager.register_pre_execution_callback(cb_debug) + self._dse.callback_manager.register_pre_execution_callback(cb_debug) - if not self.run_one(online): + if not self.__run_one_online(): break - self.agent.stop() + self._agent.stop() - def run_one(self, online: bool): + def __run_one_online(self): # Run while we are not instructed to stop while not self._stop: + self.__wait_seed_event() - # wait for seed event - self._wait_seed_event() self._start_time = time.time() - st = self.dse.explore() - - if not online: - return False # in offline whatever the status we stop - - else: # ONLINE - if st == ExplorationStatus.STOPPED: # if the exploration stopped just return - logging.info("exploration stopped") - return False - elif st == ExplorationStatus.TERMINATED: - self.agent.send_stop_coverage_criteria() - return True # Reset and wait for further instruction from the broker - elif st == ExplorationStatus.IDLE: # no seed - if self._chkmode == CheckMode.ALERT_ONE: - self.agent.send_stop_coverage_criteria() # Warn: the broker we explored the whole search space and did not validated the target - return True # Make ourself ready to receive a new one - else: # wait for seed of peers - logging.info("exploration idle (worklist empty)") - self.agent.send_log(LogLevel.INFO, "exploration idle (worklist empty)") - else: - logging.error(f"explorator not meant to be in state: {st}") - return False + status = self._dse.explore() + + self.__save_stats() + + if status == ExplorationStatus.STOPPED: # If the exploration stopped just return. + logging.info("Exploration stopped") + return False # This will cause the agent to stop. + elif status == ExplorationStatus.TERMINATED: + self._agent.send_stop_coverage_criteria() # This will make the broker to relaunch all clients. + return True # Reset and wait for further instruction from the broker. + elif status == ExplorationStatus.IDLE: # No more seeds available. + if self._check_mode == CheckMode.ALERT_ONE: + self._agent.send_stop_coverage_criteria() # Warn the broker we explored the whole search space and + # did not validate the target. + # This will make the broker to relaunch all clients. + return True # Make ourselves ready to receive a new one. + else: # Wait for seeds from peers. + logging.info("Exploration idle (worklist empty)") + self._agent.send_log(LogLevel.INFO, "exploration idle (worklist empty)") + else: + logging.error(f"Explorator not meant to be in state: {status}") + return False # This will cause the agent to stop. - # Finished an exploration batch - self.save_stats() # Save stats + def __run_offline(self, debug_pp: bool = False): + def cb_debug(se: SymbolicExecutor, _: ProcessState): + se.debug_pp = True - def _wait_seed_event(self): - logging.info("wait to receive seeds") - while not self.dse.seeds_manager.seeds_available() and not self._stop: - self.try_process_seed_queue() - time.sleep(0.5) + if not self._stop: + # Just wait until the broker says let's go + while self._dse is None: + time.sleep(0.10) + if debug_pp: + self._dse.callback_manager.register_pre_execution_callback(cb_debug) - def cb_post_execution(self, se: SymbolicExecutor, state: ProcessState): - """ - This function is called after each execution. In this function we verify - the ABV_GENERAL alert when a crash occurred during the last execution. + self.__wait_seed_event() - :param se: The current symbolic executor - :param state: The current processus state of the execution - :return: None - """ - # Send seed that have been executed - mapper = {SeedStatus.OK_DONE: SeedType.INPUT, SeedStatus.CRASH: SeedType.CRASH, SeedStatus.HANG: SeedType.HANG} - seed = se.seed - if seed.status == SeedStatus.NEW: - logging.warning(f"seed is not meant to be NEW in post execution current:{seed.status.name}") - elif seed.status in [SeedStatus.CRASH, SeedStatus.HANG]: # The stats is new send it back again - if seed not in self._seed_received: # Do not send back a seed that already came from broker - self.agent.send_seed(mapper[seed.status], seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.content) - else: # INPUT - pass # Do not send it back again + self._start_time = time.time() - # Update some stats - if se.seed.status == SeedStatus.CRASH: - self.nb_crash += 1 - elif se.seed.status == SeedStatus.HANG: - self.nb_to += 1 + self._dse.explore() - # Handle CRASH and ABV_GENERAL - if se.seed.status == SeedStatus.CRASH and self._last_id: - alert = self.sast_report.get_alert(self._last_id) - if alert.type == "ABV_GENERAL": - logging.info(f'A crash occured with an ABV_GENERAL encountered just before.') - self.dual_log(LogLevel.INFO, f"Alert [{alert.id}] in {alert.file}:{alert.line}: {alert.type} validation [SUCCESS]") - alert.validated = True - self.agent.send_alert_data(AlertData(alert.id, alert.covered, alert.validated, se.seed.content, self._last_id_pc)) + self.__save_stats() - # Process all the seed received - self.try_process_seed_queue() + self._agent.stop() - # Print stats - if self.sast_report: - cov, va, tot = self.sast_report.get_stats() - logging.info(f"SAST stats: defaults: [covered:{cov}/{tot}] [validated:{va}/{tot}]") + def __setup_agent(self): + # Register callbacks. + self._agent.register_seed_callback(self.__seed_received_cb) + self._agent.register_stop_callback(self.__stop_received_cb) - def try_process_seed_queue(self): + def __wait_seed_event(self): + logging.info("Waiting to receive seeds") + while not self._dse.seeds_manager.seeds_available() and not self._stop: + self.__try_process_seed_queue() + time.sleep(0.5) - while not self._seed_queue.empty() and not self._stop: - seed, typ = self._seed_queue.get() - self._process_seed_received(typ, seed) + def __try_process_seed_queue(self): + while not self._seed_recvs_queue.empty() and not self._stop: + seed = self._seed_recvs_queue.get() + self.__process_seed_received(seed) - def cb_telemetry(self, dse: SymbolicExplorator): + def dual_log(self, level: LogLevel, message: str) -> None: """ - Callback called after each execution to send telemetry to the broker + Helper function to log message both in the local log system and also + to the broker. - :param dse: SymbolicExplorator + :param level: LogLevel message type + :param message: string message to log :return: None """ - new_count = dse.coverage.unique_covitem_covered - - if new_count != self._cur_cov_count: # Coverage has been updated - self._cur_cov_count = new_count # update count - self._last_cov_update = time.time() # update last coverage update to be now - - if dse.coverage.strategy == CoverageStrategy.PREFIXED_EDGE: - new_count = len(set(x[1] for x in dse.coverage.covered_items.keys())) # For prefixed-edge only count edge - - self.agent.send_telemetry(exec_per_sec=int(dse.execution_count / (time.time()-dse.ts)), - total_exec=dse.execution_count, - timeout=self.nb_to, - coverage_block=dse.coverage.unique_instruction_covered, - coverage_edge=new_count if dse.coverage in [CoverageStrategy.EDGE, CoverageStrategy.PREFIXED_EDGE] else 0, - coverage_path=new_count if dse.coverage.strategy == CoverageStrategy.PATH else 0, - last_cov_update=int(self._last_cov_update)) - - def cb_on_solving(self, dse: SymbolicExplorator, pstate: ProcessState, edge: Edge, typ: SymExType) -> bool: - # Only consider conditional and dynamic jumps. - if typ in [SymExType.SYMBOLIC_READ, SymExType.SYMBOLIC_WRITE]: - return True + log_level_mapper = { + LogLevel.DEBUG: "debug", + LogLevel.INFO: "info", + LogLevel.CRITICAL: "critical", + LogLevel.WARNING: "warning", + LogLevel.ERROR: "error" + } + log_fn = getattr(logging, log_level_mapper[level]) + log_fn(message) - # Unpack edge. - src, dst = edge + self._agent.send_log(level, message) - # Find the function which holds the basic block of the destination. - dst_fn = self.program.find_function_from_addr(dst) - if dst_fn is None: - logging.warning("Solving edge ({src:#x} -> {dst:#x}) not in a function") - return True - else: - if dst_fn.start in self._program_slice: - return True - else: - logging.info( - f"Slicer: reject edge ({src:#x} -> {dst:#x} ({dst_fn.name}) not in slice!") - return False + # --- + # ClientAgent Callbacks + # --- def start_received(self, fname: str, binary: bytes, engine: FuzzingEngineInfo, exmode: ExecMode, fuzzmode: FuzzMode, chkmode: CheckMode, - covmode: CoverageMode, seed_inj: SeedInjectLoc, engine_args: str, argv: List[str], sast_report: str=None): + covmode: CoverageMode, seed_inj: SeedInjectLoc, engine_args: str, argv: List[str], sast_report: str = None): """ - This function is called when the broker says to start the fuzzing session. Here, we receive all information about - the program to fuzz and the configuration. + This function is called when the broker says to start the fuzzing session. Here, we receive all information + about the program to fuzz and the configuration. :param fname: The name of the binary to explore :param binary: The content of the binary to explore @@ -310,102 +290,88 @@ def start_received(self, fname: str, binary: bytes, engine: FuzzingEngineInfo, e :param sast_report: The SAST report :return: None """ - logging.info(f"[BROKER] [START] bin:{fname} engine:{engine.name} exmode:{exmode.name} cov:{covmode.name} chk:{chkmode.name}") + logging.info(f"[START] bin:{fname} engine:{engine.name} exmode:{exmode.name} covmod:{covmode.name} " + f"seedloc:{seed_inj.name} chk:{chkmode.name}") + + if self._dse is not None: + self.dual_log(LogLevel.CRITICAL, "Instance already started!") + return - if self.dse is not None: - logging.warning("DSE already instanciated (override it)") + if engine.name != "TRITON": + logging.error(f"Wrong fuzzing engine received {engine.name} while I am Triton") + self.dual_log(LogLevel.ERROR, f"Invalid fuzzing engine received {engine.name} can't do anything") + return if engine.version != pastistritondse.__version__: - logging.error(f"Pastis-DSE mismatch with one from the server {engine.version} (local: {pastistritondse.__version__})") + logging.error(f"Wrong fuzzing engine version {engine.version} received") + self.dual_log(LogLevel.ERROR, f"Invalid fuzzing engine version {engine.version} do nothing") return - self._seedloc = seed_inj + self._seed_inj = seed_inj + self._check_mode = chkmode - # ------- Create the TritonDSE configuration file --------- - if engine_args: - self.config = Config.from_json(engine_args) - else: - self.config = Config() # Empty configuration - # Use argv ONLY if no configuration provided - self.config.program_argv = [f"./{fname}"] - if argv: - self.config.program_argv.extend(argv) # Preprend the binary to argv + self.__initialize_config(fname, argv, engine_args, seed_inj, covmode) - """ - Actions taken depending on seed format & co: - Config | Inject | Result - RAW STDIN / - COMPOSITE STDIN / (but need 'stdin' in files) - RAW ARGV change to COMPOSITE to be able to inject on argv (and convert seeds on the fly) - COMPOSITE ARGV / (but need 'input_file' in files) - """ - if seed_inj == SeedInjectLoc.ARGV: # Make sure we inject input on argv - if self.config.is_format_raw(): - logging.warning("injection is ARGV thus switch config seed format to COMPOSITE") - self.config.seed_format = SeedFormat.COMPOSITE - if "@@" in self.config.program_argv: - idx = self.config.program_argv.index("@@") - self.config.program_argv[idx] = self.INPUT_FILE_NAME - else: - logging.warning(f"seed inject {self._seedloc.name} but no '@@' found in argv (will likely not work!)") - else: # SeedInjectLoc.STDIN - if engine_args: - if self.config.is_format_composite(): - self.dual_log(LogLevel.WARNING, "injecting on STDIN but seed format is COMPOSITE") - else: # no config was provided just override - self.config.seed_format = SeedFormat.RAW - pass # nothing to do ? - # ---------------------------------------------------------- - - # If a workspace is given keep it other generate new unique one - if not self.config.workspace: - ws = f"/tmp/triton_workspace/{int(time.time())}" - logging.info(f"Configure workspace to be: {ws}") - self.config.workspace = ws - - # Create the workspace object in advance (to directly save the binary inside - workspace = Workspace(self.config.workspace) - workspace.initialize(flush=False) + self.__initialize_workspace() + # Retrieve package out of the binary received. try: - pkg = BinaryPackage.from_binary(fname, binary, workspace.get_binary_directory()) + pkg = BinaryPackage.from_binary(fname, binary, self._workspace.get_binary_directory()) except FileNotFoundError: - logging.error("Invalid package data") + self.dual_log(LogLevel.ERROR, "Invalid package data") return - if pkg.is_quokka(): - logging.info(f"load QuokkaProgram: {pkg.quokka.name}") - self.program = QuokkaProgram(pkg.quokka, pkg.executable_path) - else: - logging.info(f"load Program: {pkg.executable_path.name} [{self._seedloc.name}]") - program = Program(pkg.executable_path) - - # Make sure the Program is compatible with the local platform - if self.is_compatible_with_local(program): - self.program = CleLoader(pkg.executable_path) - else: - self.program = program + self._program = self.__initialize_program(pkg) - if self.program is None: + if self._program is None: self.dual_log(LogLevel.CRITICAL, f"LIEF was not able to parse the binary file {fname}") - self.agent.stop() return - # Enable local tracing if the binary is compatible with local architecture - self._tracing_enabled = self.is_compatible_with_local(self.program) - logging.info(f"Local arch and program arch matching: {self._tracing_enabled}") + if sast_report: + logging.info("Loading SAST report") + self._report = SASTReport.from_json(sast_report) + logging.info(f"SAST report alerts: {len(list(self._report.iter_alerts()))}") - # Update the coverage strategy in the current config (it overrides the config file one) + # Initialize the dynamic symbolic executor. try: - self.config.coverage_strategy = CoverageStrategy(covmode.value) # names are meant to match + self._dse = self.__initialize_dse(chkmode, self._workspace, self._probes) except Exception as e: - logging.info(f"Invalid covmode. Not supported by the tritondse library {e}") - self.agent.stop() + self.dual_log(LogLevel.CRITICAL, f"Unexpected error while initializing the DSE: {e}") return - if sast_report: - self.sast_report = SASTReport.from_json(sast_report) - logging.info(f"SAST report loaded: alerts:{len(list(self.sast_report.iter_alerts()))}") + def __seed_received_cb(self, typ: SeedType, seed: bytes): + """ + This function is called when we receive a seed from the broker. + + :param typ: The type of the seed + :param seed: The sequence of bytes representing the seed + :return: None + """ + self.__add_seed(seed) + + def __stop_received_cb(self): + """ + This function is called when the broker says stop. (Called from the agent thread) + """ + logging.info(f"[STOP]") + + self.stop() + + # --- + # DSE Initialization + # --- + + def __initialize_dse(self, chkmode: CheckMode, workspace: Workspace, probes: List[ProbeInterface]): + self.__instantiate_argv(self._seed_inj) + + # Enable local tracing if the binary is compatible with local architecture + self._tracing_enabled = is_compatible_with_local(self._program) + logging.info(f"Local arch and program arch matching: {self._tracing_enabled}") + + if self._tracing_enabled: + self._replayer = Replayer(self._program, self._config, self._seed_inj) + + self._alert_validator = AlertValidator() # Set seed scheduler based on whether tracing is enabled. if self._tracing_enabled: @@ -413,267 +379,268 @@ def start_received(self, fname: str, binary: bytes, engine: FuzzingEngineInfo, e else: seed_scheduler_class = FreshSeedPrioritizerWorklist - dse = SymbolicExplorator(self.config, self.program, workspace=workspace, seed_scheduler_class=seed_scheduler_class) + dse = SymbolicExplorator(self._config, self._program, workspace=workspace, + seed_scheduler_class=seed_scheduler_class) - # Register common callbacks - dse.callback_manager.register_new_input_callback(self.send_seed_to_broker) # must be the second cb - dse.callback_manager.register_post_execution_callback(self.cb_post_execution) - dse.callback_manager.register_exploration_step_callback(self.cb_telemetry) + # Register common callbacks. + dse.callback_manager.register_new_input_callback(self.__send_input_seed_cb) + dse.callback_manager.register_post_execution_callback(self.__post_execution_cb) + dse.callback_manager.register_exploration_step_callback(self.__send_telemetry_cb) - for probe in self._probes: + # Register user-provided probes. + for probe in probes: dse.callback_manager.register_probe(probe) - # Save check mode - self._chkmode = chkmode - + # Set up the dse instance according to the check mode parameter. if chkmode == CheckMode.CHECK_ALL: - dse.callback_manager.register_probe(UAFSanitizer()) - dse.callback_manager.register_probe(NullDerefSanitizer()) - dse.callback_manager.register_probe(FormatStringSanitizer()) - #dse.callback_manager.register_probe(IntegerOverflowSanitizer()) - # TODO Buffer overflow - + dse.callback_manager.register_probe(UAFSanitizer()) + dse.callback_manager.register_probe(NullDerefSanitizer()) + dse.callback_manager.register_probe(FormatStringSanitizer()) + # dse.callback_manager.register_probe(IntegerOverflowSanitizer()) + # TODO Buffer overflow elif chkmode == CheckMode.ALERT_ONLY: - dse.callback_manager.register_function_callback('__sast_alert_placeholder', self.intrinsic_callback) - + # TODO: Refactor out into a probe (IntrinsicsProbe). + dse.callback_manager.register_function_callback('__sast_alert_placeholder', self.__intrinsic_cb) elif chkmode == CheckMode.ALERT_ONE: # targeted approach - if not isinstance(self.program, QuokkaProgram): - logging.error(f"Targeted mode [{chkmode.name}] requires a Quokka program") - self.agent.stop() - return + self.__setup_slice_mode(chkmode, dse) - target_addr = self.config.custom['target'] # retrieve the target address to reach - dse.callback_manager.register_post_addr_callback(target_addr, self.intrinsic_callback) + return dse - # NOTE Target address must be the starting address of a basic block. - slice_from = self.program.find_function_addr('main') - slice_to = target_addr + def __setup_slice_mode(self, chkmode, dse): + if not isinstance(self._program, QuokkaProgram): + logging.error(f"Targeted mode [{chkmode.name}] requires a Quokka program") + raise Exception(f"Targeted mode [{chkmode.name}] requires a Quokka program") - if slice_from and slice_to: - # Find the functions that correspond to the from and to addresses. - slice_from_fn = self.program.find_function_from_addr(slice_from) - slice_to_fn = self.program.find_function_from_addr(slice_to) - logging.info(f"launching exploration in targeted mode on: 0x{target_addr:08x} in {slice_to_fn.name}") + # Retrieve the target address to reach, and set the callback. + target_addr = self._config.custom['target'] + dse.callback_manager.register_post_addr_callback(target_addr, self.__intrinsic_cb) - if slice_from_fn and slice_to_fn: - # NOTE Generate call graph with backedges so when we do the - # slice it also includes functions that are called in - # the path from the source to the destination of the - # slice. - call_graph = self.program.get_call_graph(backedge_on_ret=True) + # NOTE Target address must be the starting address of a basic block. + slice_from = self._program.find_function_addr('main') + slice_to = target_addr - logging.info(f'Slicing program from {slice_from:#x} ({slice_from_fn.name}) to {slice_to:#x} ({slice_to_fn.name})') + if slice_from and slice_to: + # Find the functions that correspond to the from and to addresses. + slice_from_fn = self._program.find_function_from_addr(slice_from) + slice_to_fn = self._program.find_function_from_addr(slice_to) + logging.info(f"Launching exploration in targeted mode on: 0x{target_addr:08x} in {slice_to_fn.name}") - self._program_slice = QuokkaProgram.get_slice(call_graph, slice_from_fn.start, slice_to_fn.start) + if slice_from_fn and slice_to_fn: + # NOTE Generate call graph with backedges so when we do the + # slice it also includes functions that are called in + # the path from the source to the destination of the + # slice. + call_graph = self._program.get_call_graph(backedge_on_ret=True) - logging.info(f'Call graph (full): #nodes: {len(call_graph.nodes)}, #edges: {len(call_graph.edges)}') - logging.info(f'Call graph (sliced): #nodes: {len(self._program_slice.nodes)}, #edges: {len(self._program_slice.edges)}') + logging.info(f'Slicing program from {slice_from:#x} ({slice_from_fn.name}) to {slice_to:#x} ({slice_to_fn.name})') - dse.callback_manager.register_on_solving_callback(self.cb_on_solving) - else: - logging.warning(f'Invalid source or target function, skipping slicing!') + self._program_slice = QuokkaProgram.get_slice(call_graph, slice_from_fn.start, slice_to_fn.start) + + logging.info(f'Call graph (full): #nodes: {len(call_graph.nodes)}, #edges: {len(call_graph.edges)}') + logging.info(f'Call graph (sliced): #nodes: {len(self._program_slice.nodes)}, #edges: {len(self._program_slice.edges)}') + + dse.callback_manager.register_on_solving_callback(self.__on_solving_cb) else: - logging.warning(f'Invalid source or target addresses, skipping slicing!') + logging.warning(f'Invalid source or target function, skipping slicing!') + else: + logging.warning(f'Invalid source or target addresses, skipping slicing!') - # will trigger the dse to start has another thread is waiting for self.dse to be not None - self.dse = dse + def __initialize_program(self, package: BinaryPackage): + if package.is_quokka(): + logging.info(f"Load QuokkaProgram: {package.quokka.name}") + program = QuokkaProgram(package.quokka, package.executable_path) + else: + logging.info(f"Load Program: {package.executable_path.name} [{self._seed_inj.name}]") + program = Program(package.executable_path) - def _get_seed(self, raw_seed: bytes) -> Seed: - # Convert seed to CompositeData - seed = Seed.from_bytes(raw_seed) + # Make sure the program is compatible with the local platform + if is_compatible_with_local(program): + program = CleLoader(package.executable_path) - if self.config.is_format_composite() and seed.is_raw() and self._seedloc == SeedInjectLoc.ARGV: - logging.debug("convert raw seed to composite") - return Seed(CompositeData(files={self.INPUT_FILE_NAME: seed.content})) + return program - elif self.config.is_format_composite() and seed.is_raw() and self._seedloc == SeedInjectLoc.STDIN: - logging.warning("Seed is RAW but format is COMPOSITE with injection on STDIN") - return Seed(CompositeData(files={"stdin": seed.content})) + def __initialize_workspace(self): + workspace_path = self._config.workspace - else: - return seed + # If a workspace is given keep it, otherwise generate a new unique one. + if not workspace_path: + workspace_path = self.DEFAULT_WS_PATH + f"/{int(time.time())}" - def seed_received(self, typ: SeedType, seed: bytes): - """ - This function is called when we receive a seed from the broker. + logging.info(f"Configure workspace to be: {workspace_path}") - :param typ: The type of the seed - :param seed: The seed - :return: None - """ - seed = self._get_seed(seed) + # Create the workspace object in advance (to directly save the binary inside) + self._workspace = Workspace(workspace_path) + self._workspace.initialize(flush=False) - if seed in self._seed_received: - logging.warning(f"receiving seed already known: {to_h(seed)} (dropped)") - return + def __initialize_config(self, program_name: str, program_argv: List[str], config_json: str, seed_inj: SeedInjectLoc, covmode: CoverageMode): + # Load or create configuration. + if config_json: + logging.info(f"Loading existing configuration") + self._config = Config.from_json(config_json) else: - self._seed_queue.put((seed, typ)) - logging.info(f"seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})") + logging.info(f"Creating empty configuration") + # Set seed format according to the injection location. + seed_format = SeedFormat.COMPOSITE if seed_inj == SeedInjectLoc.ARGV else SeedFormat.RAW - def _process_seed_received(self, typ: SeedType, seed: Seed): - """ - This function is called when we receive a seed from the broker. + # Create empty configuration and assign default settings. + self._config = Config(program_argv=[f"./{program_name}"] + program_argv, + seed_format=seed_format) - :param typ: The type of the seed - :param seed: The seed - :return: None - """ + self.__check_and_fix_seed_format(seed_inj) + + # Update the coverage strategy in the current config (it overrides the config file one) try: - if not self._tracing_enabled: - # Accept all seeds - self.dse.add_input_seed(seed) - - else: # Try running the seed to know whether to keep it - # NOTE: re-run the seed regardless of its status - coverage = None - logging.info(f"process seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})") - - data = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.bytes() - self.replay_seed_file.write_bytes(data) - # Adjust injection location before calling QBDITrace - if self._seedloc == SeedInjectLoc.STDIN: - stdin_file = str(self.replay_seed_file) - argv = self.config.program_argv - else: # SeedInjectLoc.ARGV - stdin_file = None - try: - # Replace 'input_file' in argv with the temporary file name created - argv = self.config.program_argv[:] - idx = argv.index(self.INPUT_FILE_NAME) - argv[idx] = str(self.replay_seed_file) - except ValueError: - logging.error(f"seed injection {self._seedloc.name} but can't find 'input_file' on program argv") - return - - t0 = time.time() - try: - # Run the seed and determine whether it improves our current coverage. - if QBDITrace.run(self.config.coverage_strategy, - str(self.program.path.resolve()), - argv[1:] if len(argv) > 1 else [], - output_path=str(self.replay_trace_file), - stdin_file=stdin_file, - cwd=Path(self.program.path).parent, - timeout=60): - coverage = QBDITrace.from_file(str(self.replay_trace_file)).coverage - else: - logging.warning("Cannot load the coverage file generated (maybe had crashed?)") - coverage = None - except FileNotFoundError: - logging.warning("Cannot load the coverage file generated (maybe had crashed?)") - except TraceException: - logging.warning('There was an error while trying to re-run the seed') - replay_time = time.time() - t0 - logging.info(f'replay time for seed {to_h(seed)}: {int(replay_time)}s') - self._replay_acc += replay_time # Save time spent replaying inputs - - if not coverage: - logging.warning(f"coverage not found after replaying: {to_h(seed)} [{typ.name}] (add it anyway)") - # Add the seed anyway, if it was not possible to re-run the seed. - # TODO Set seed.coverage_objectives as "empty" (use ellipsis - # object). Modify WorklistAddressToSet to support it. - self.seeds_merged += 1 - self.dse.add_input_seed(seed) - else: - # Check whether the seed improves the current coverage. - if self.dse.coverage.improve_coverage(coverage): - logging.info(f"seed added {to_h(seed)} [{typ.name}] (coverage merged)") - self.seeds_merged += 1 - self.dse.coverage.merge(coverage) - self.dse.seeds_manager.worklist.update_worklist(coverage) - - seed.coverage_objectives = self.dse.coverage.new_items_to_cover(coverage) - self.dse.add_input_seed(seed) - else: - logging.info(f"seed archived {to_h(seed)} [{typ.name}] (NOT merging coverage)") - self.seeds_rejected += 1 - #self.dse.seeds_manager.archive_seed(seed) - # logging.info(f"seed archived {seed.hash} [{typ.name}]") - - self._seed_received.add(seed) # Remember seed received not to send them back - except FileNotFoundError as e: - # NOTE If reset() is call during the execution of this function, - # self.dse will be set to None and an AttributeError will occur. - logging.warning(f"receiving seeds while the DSE is not instantiated {e}") + self._config.coverage_strategy = CoverageStrategy(covmode.value) # Names are meant to match. + except Exception as e: + logging.critical(f"Invalid covmode (not supported by TritonDSE): {e}") + + def __check_and_fix_seed_format(self, seed_inj: SeedInjectLoc) -> None: + # Actions taken depending on seed format and injection method: + # | Config | Inject | Action | + # RAW STDIN None + # COMPOSITE STDIN None (seed needs 'stdin' in files) + # RAW ARGV Fix (switch to COMPOSITE to be able to inject on argv (and convert seeds on the fly)) + # COMPOSITE ARGV None (seed needs 'input_file' in files) + if seed_inj == SeedInjectLoc.STDIN: + if self._config.is_format_raw(): + # Nothing to do. + pass + else: # self._config.is_format_composite() + logging.warning("Injecting on STDIN but seed format is COMPOSITE") + else: # SeedInjectLoc.ARGV + if self._config.is_format_raw(): + logging.warning("Injection is ARGV thus switch config seed format to COMPOSITE") + self._config.seed_format = SeedFormat.COMPOSITE + else: # self._config.is_format_composite() + if "@@" not in self._config.program_argv: + logging.warning("Injection is ARGV but there is no injection marker (@@)") + + def __instantiate_argv(self, seed_inj: SeedInjectLoc): + if seed_inj == SeedInjectLoc.ARGV: + if "@@" in self._config.program_argv: + idx = self._config.program_argv.index("@@") + self._config.program_argv[idx] = self.INPUT_FILE_NAME + else: + logging.warning(f"Seed inject location {self._seed_inj.name} but no '@@' found in argv (will likely not work!)") - rcv = len(self._seed_received) - logging.info(f"seeds recv: {rcv} | merged {self.seeds_merged} [{(self.seeds_merged/rcv) * 100:.2f}%]" - f" rejected {self.seeds_rejected} [{(self.seeds_rejected/rcv) * 100:.2f}%]") + # --- + # SymbolicExecutor Callbacks + # --- - def stop_received(self): + def __post_execution_cb(self, se: SymbolicExecutor, pstate: ProcessState): """ - This function is called when the broker says stop. (Called from the agent thread) + This function is called after each execution. In this function we verify + the ABV_GENERAL alert when a crash occurred during the last execution. + + :param se: The current symbolic executor + :param pstate: The current process state of the execution + :return: None """ - logging.info(f"[BROKER] [STOP]") + # Process seed + if se.seed.status == SeedStatus.NEW: + logging.warning(f"Seed is not meant to be NEW in post execution: {se.seed.status.name}") + elif se.seed.status in [SeedStatus.CRASH, SeedStatus.HANG]: + # The status changed, send it back again. + self.__send_seed(se.seed) + else: + # If se.seed.status in [SeedStatus.FAIL, SeedStatus.OK_DONE], do + # not send it back again. + pass - if self.dse: - self.dse.stop_exploration() + # Update some stats + if se.seed.status == SeedStatus.CRASH: + self._crash_count += 1 + elif se.seed.status == SeedStatus.HANG: + self._timeout_count += 1 - self.save_stats() # Save stats + # Handle CRASH and ABV_GENERAL + if se.seed.status == SeedStatus.CRASH and self._last_alert_id: + alert = self._report.get_alert(self._last_alert_id) + if alert.type == "ABV_GENERAL": + logging.info(f'A crash occurred with an ABV_GENERAL encountered just before.') + self.dual_log(LogLevel.INFO, f"Alert [{alert.id}] in {alert.file}:{alert.line}: {alert.type} validation [SUCCESS]") + alert.validated = True + self.__send_alert_data(alert, se.seed, self._last_alert_id_pc) - self._stop = True - # self.agent.stop() # Can't call it here as this function executed from within agent thread + # Print stats + if self._report: + cov, va, tot = self._report.get_stats() + logging.info(f"SAST stats: defaults: [covered:{cov}/{tot}] [validated:{va}/{tot}]") - def save_stats(self): - stat_file = self.dse.workspace.get_metadata_file_path(self.STAT_FILE) - data = { - "total_time": time.time() - self._start_time, - "emulation_time": self.dse.total_emulation_time, # Note: includes replay time but not solving - "solving_time": self.dse.seeds_manager.total_solving_time, - "replay_time": self._replay_acc, - "seed_accepted": self.seeds_merged, - "seed_rejected": self.seeds_rejected, - "seed_received": self.seeds_merged + self.seeds_rejected - } - stat_file.write_text(json.dumps(data)) + # Process enqueued seeds + self.__try_process_seed_queue() - def dual_log(self, level: LogLevel, message: str) -> None: + def __send_telemetry_cb(self, dse: SymbolicExplorator): """ - Helper function to log message both in the local log system and also - to the broker. + Callback called after each execution to send telemetry to the broker - :param level: LogLevel message type - :param message: string message to log + :param dse: SymbolicExplorator :return: None """ - mapper = {LogLevel.DEBUG: "debug", - LogLevel.INFO: "info", - LogLevel.CRITICAL: "critical", - LogLevel.WARNING: "warning", - LogLevel.ERROR: "error"} - log_f = getattr(logging, mapper[level]) - log_f(message) - self.agent.send_log(level, message) - - def send_seed_to_broker(self, se: SymbolicExecutor, state: ProcessState, seed: Seed): - if seed not in self._seed_received: # Do not send back a seed that already came from broker - self._sending_count += 1 - logging.info(f"Sending new: {to_h(seed)} [{self._sending_count}]") - bytes = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.content - self.agent.send_seed(SeedType.INPUT, bytes) - - def intrinsic_callback(self, se: SymbolicExecutor, state: ProcessState, addr: Addr): + new_count = dse.coverage.unique_covitem_covered + + if new_count != self._cur_cov_count: # Coverage has been updated + self._cur_cov_count = new_count # Update count + self._last_cov_update = time.time() # Update last coverage update to be now + + if dse.coverage.strategy == CoverageStrategy.PREFIXED_EDGE: + new_count = len(set(x[1] for x in dse.coverage.covered_items.keys())) # For prefixed-edge only count edge + + self._agent.send_telemetry(exec_per_sec=int(dse.execution_count / (time.time() - dse.ts)), + total_exec=dse.execution_count, + timeout=self._timeout_count, + coverage_block=dse.coverage.unique_instruction_covered, + coverage_edge=new_count if dse.coverage in [CoverageStrategy.EDGE, CoverageStrategy.PREFIXED_EDGE] else 0, + coverage_path=new_count if dse.coverage.strategy == CoverageStrategy.PATH else 0, + last_cov_update=int(self._last_cov_update)) + + def __on_solving_cb(self, se: SymbolicExplorator, pstate: ProcessState, edge: Edge, typ: SymExType, + astnode: AstNode, astnode_list: List[AstNode]) -> bool: + # Only consider conditional and dynamic jumps. + if typ in [SymExType.SYMBOLIC_READ, SymExType.SYMBOLIC_WRITE]: + return True + + # Unpack edge. + src, dst = edge + + # Find the function which holds the basic block of the destination. + dst_fn = self._program.find_function_from_addr(dst) + if dst_fn is None: + logging.warning("Solving edge ({src:#x} -> {dst:#x}) not in a function") + return True + else: + if dst_fn.start in self._program_slice: + return True + else: + logging.info(f"Slicer: reject edge ({src:#x} -> {dst:#x} ({dst_fn.name}) not in slice!") + return False + + def __send_input_seed_cb(self, se: SymbolicExecutor, pstate: ProcessState, seed: Seed): + self.__send_seed(seed) + + def __intrinsic_cb(self, se: SymbolicExecutor, pstate: ProcessState, addr: Addr): """ This function is called when an intrinsic call occurs in order to verify defaults and vulnerabilities. :param se: The current symbolic executor - :param state: The current processus state of the execution + :param pstate: The current process state of the execution :param addr: The instruction address of the intrinsic call :return: None """ - alert_id = state.get_argument_value(0) - self._last_id = alert_id - self._last_id_pc = se.previous_pc + alert_id = pstate.get_argument_value(0) + + self._last_alert_id = alert_id + self._last_alert_id_pc = se.previous_pc def status_changed(a, cov, vld): return a.covered != cov or a.validated != vld - if self.sast_report: + if self._report: # Retrieve the SASTAlert object from the report try: - alert = self.sast_report.get_alert(alert_id) + alert = self._report.get_alert(alert_id) cov, vld = alert.covered, alert.validated except IndexError: logging.warning(f"Intrinsic id {alert_id} not found in report (ignored)") @@ -684,7 +651,7 @@ def status_changed(a, cov, vld): alert.covered = True # that might also set validated to true! if not alert.validated: # If of type VULNERABILITY and not yet validated - res = self.check_alert_dispatcher(alert.code, se, state, addr) + res = self._alert_validator.validate(alert.code, se, pstate, addr) if res: alert.validated = True self.dual_log(LogLevel.INFO, f"Alert [{alert.id}] in {alert.file}:{alert.line}: {alert.type} validation [SUCCESS]") @@ -696,190 +663,212 @@ def status_changed(a, cov, vld): if status_changed(alert, cov, vld): # If either coverage or validation were improved print stats # Send updates to the broker - self.agent.send_alert_data(AlertData(alert.id, alert.covered, alert.validated, se.seed.content, se.previous_pc)) - cov, vals, tot = self.sast_report.get_stats() + self.__send_alert_data(alert, se.seed, se.previous_pc) + cov, vals, tot = self._report.get_stats() logging.info(f"SAST stats: defaults: [covered:{cov}/{tot}] [validated:{vals}/{tot}]") - if self.sast_report.all_alerts_validated() or (self._chkmode == CheckMode.ALERT_ONE and alert.validated): - self._do_stop_all_alerts_validated() + if self._report.all_alerts_validated() or (self._check_mode == CheckMode.ALERT_ONE and alert.validated): + self.__do_stop_all_alerts_validated() else: # Kind of autonomous mode. Try to check it even it is not bound to a report # Retrieve alert type from parameters alert_type = se.pstate.get_string_argument(1) try: - if self.check_alert_dispatcher(alert_type, se, state, addr): + if self._alert_validator.validate(alert_type, se, pstate, addr): logging.info(f"Alert {alert_id} of type {alert_type} [VALIDATED]") else: logging.info(f"Alert {alert_id} of type {alert_type} [NOT VALIDATED]") except KeyError: logging.error(f"Alert type {alert_type} not recognized") - - def check_alert_dispatcher(self, type: str, se: SymbolicExecutor, state: ProcessState, addr: Addr) -> bool: + def __do_stop_all_alerts_validated(self) -> None: """ - This function is called by intrinsic_callback in order to verify defaults - and vulnerabilities. - - :param type: Type of the alert as a string - :param se: The current symbolic executor - :param state: The current processus state of the execution - :param addr: The instruction address of the intrinsic call - :return: True if a vulnerability has been verified + Function called if all alerts have been covered and validated. All data are meant to + have been transmitted to the broker, but writes down locally the CSV anyway + :return: None """ - # BUFFER_OVERFLOW related alerts - if type == "SV_STRBO_UNBOUND_COPY": - size = se.pstate.get_argument_value(2) - ptr = se.pstate.get_argument_value(3) - - # Runtime check - if len(se.pstate.get_memory_string(ptr)) >= size: - # FIXME: Do we have to define the seed as CRASH even if there is no crash? - # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever - return True + logging.info("All defaults and vulnerability have been covered !") - # Symbolic check - actx = se.pstate.actx - predicate = [se.pstate.tt_ctx.getPathPredicate()] + # Write the final CSV in the workspace directory + out_file = self._dse.workspace.get_metadata_file_path("klocwork_coverage_results.csv") + self._report.write_csv(out_file) - # For each memory cell, try to proof that they can be different from \0 - for i in range(size + 1): # +1 in order to proof that we can at least do an off-by-one - cell = se.pstate.tt_ctx.getMemoryAst(MemoryAccess(ptr + i, CPUSIZE.BYTE)) - predicate.append(cell != 0) + # Stop the dse exploration + self._dse.terminate_exploration() - # FIXME: Maybe we can generate models until unsat in order to find the bigger string + def __save_stats(self): + stat_file = self._workspace.get_metadata_file_path(self.STAT_FILE) + data = { + "total_time": time.time() - self._start_time, + "emulation_time": self._dse.total_emulation_time, # Note: includes replay time but not solving + "solving_time": self._dse.seeds_manager.total_solving_time, + "replay_time": self._replay_time_acc, + "seed_accepted": self._seeds_merged, + "seed_rejected": self._seeds_rejected, + "seed_received": self._seeds_merged + self._seeds_rejected + } + stat_file.write_text(json.dumps(data)) - model = se.pstate.tt_ctx.getModel(actx.land(predicate)) - if model: - crash_seed = mk_new_crashing_seed(se, model) - se.workspace.save_seed(crash_seed) - logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') - return True + # --- + # Auxiliary methods + # --- - return False + def __seed_hash(self, seed: Seed) -> str: + if seed.is_composite(): + if self.INPUT_FILE_NAME in seed.content.files: + content = seed.content.files[self.INPUT_FILE_NAME] + elif "stdin" in seed.content.files: + content = seed.content.files["stdin"] + else: + raise NameError("Can't find main payload in the seed") + else: + content = seed.content - ###################################################################### + return md5(content).hexdigest() - # BUFFER_OVERFLOW related alerts - elif type == "SV_STRBO_BOUND_COPY_OVERFLOW": - dst_size = se.pstate.get_argument_value(2) - ptr_inpt = se.pstate.get_argument_value(3) - max_size = se.pstate.get_argument_value(4) + def __from_raw_seed(self, raw_seed: bytes) -> Seed: + """ + Convert a raw seed (sequence of bytes) into a seed whose type is + consistent with the configuration's seed format. + """ + seed = Seed.from_bytes(raw_seed) - # Runtime check - if max_size >= dst_size and len(se.pstate.get_memory_string(ptr_inpt)) >= dst_size: - # FIXME: Do we have to define the seed as CRASH even if there is no crash? - # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever - return True + if not seed.is_raw(): + raise Exception("A raw seed was expected") + + if self._config.is_format_composite(): + logging.debug("Converting RAW seed to COMPOSITE") + if self._seed_inj == SeedInjectLoc.ARGV: + return Seed(CompositeData(files={self.INPUT_FILE_NAME: seed.content})) + else: # SeedInjectLoc.STDIN + return Seed(CompositeData(files={"stdin": seed.content})) + else: # _config.is_raw() + if self._seed_inj == SeedInjectLoc.ARGV: + raise Exception("Invalid combination of seed injection and seed format") + else: # SeedInjectLoc.STDIN + return seed + + def __to_raw_seed(self, seed: Seed) -> bytes: + """ + Convert a seed (RAW or COMPOSITE) into a raw seed (sequence of bytes). + """ + if self._seed_inj == SeedInjectLoc.ARGV: + return seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.bytes() - # Symbolic check - actx = se.pstate.actx - max_size_s = se.pstate.get_argument_symbolic(4).getAst() - predicate = [se.pstate.tt_ctx.getPathPredicate(), max_size_s >= dst_size] + if self._seed_inj == SeedInjectLoc.STDIN: + return seed.content.files["stdin"] if seed.is_composite() else seed.bytes() - # For each memory cell, try to proof that they can be different from \0 - for i in range(dst_size + 1): # +1 in order to proof that we can at least do an off-by-one - cell = se.pstate.tt_ctx.getMemoryAst(MemoryAccess(ptr_inpt + i, CPUSIZE.BYTE)) - predicate.append(cell != 0) + def __process_seed_received(self, seed: Seed) -> None: + """ + This function is called when we receive a seed from the broker. - # FIXME: Maybe we can generate models until unsat in order to find the bigger string + :param seed: The seed + :return: None + """ + logging.info(f"Process seed received {self.__seed_hash(seed)} (pool: {self._seed_recvs_queue.qsize()})") - model = se.pstate.tt_ctx.getModel(actx.land(predicate)) - if model: - crash_seed = mk_new_crashing_seed(se, model) - se.workspace.save_seed(crash_seed) - logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') - return True + try: + if not self._tracing_enabled: + # Accept all seeds. + self._dse.add_input_seed(seed) + else: + # Try running the seed to know whether to keep it. Note that the + # seed is re-run regardless of its status. + coverage = self.__replay_seed(seed) - return False + self.__process_seed_coverage(coverage, seed, SeedType.INPUT) + except AttributeError as e: + # NOTE If reset() is call during the execution of this function, + # self.dse will be set to None and an AttributeError will occur. + logging.warning(f"Receiving seeds while the DSE is not instantiated {e}") - ###################################################################### + seed_receive_count = len(self._seed_recvs) + seed_merge_rate = (self._seeds_merged / seed_receive_count) * 100 + seed_reject_rate = (self._seeds_rejected / seed_receive_count) * 100 + seed_processed = self._seeds_merged + self._seeds_rejected + seed_pending = seed_receive_count - seed_processed + seed_pending_rate = (seed_pending / seed_receive_count) * 100 - # BUFFER_OVERFLOW related alerts - elif type == "ABV_GENERAL": - logging.warning(f'ABV_GENERAL encounter but can not check the issue. This issue will be handle if the program will crash.') - return False + logging.info(f"Seeds received: {seed_receive_count} | " + f"merged {self._seeds_merged} [{seed_merge_rate:.2f}%] " + f"rejected {self._seeds_rejected} [{seed_reject_rate:.2f}%] " + f"pending {seed_pending} [{seed_pending_rate:.2f}%]") - ###################################################################### + def __replay_seed(self, seed: Seed) -> Optional[CoverageSingleRun]: + coverage, replay_time = self._replayer.run(self.__to_raw_seed(seed)) - # All INTEGER_OVERFLOW related alerts - elif type == "NUM_OVERFLOW": - return IntegerOverflowSanitizer.check(se, state, state.current_instruction) + # Save time spent replaying inputs. + self._replay_time_acc += replay_time - ###################################################################### + logging.info(f'Replay time for seed {self.__seed_hash(seed)}: {replay_time:.02f}s') - # All USE_AFTER_FREE related alerts - elif type in ["UFM_DEREF_MIGHT", "UFM_FFM_MUST", "UFM_FFM_MIGHT"]: - ptr = se.pstate.get_argument_value(2) - return UAFSanitizer.check(se, state, ptr, f'UAF detected at {ptr:#x}') + return coverage - ###################################################################### + def __process_seed_coverage(self, coverage: Optional[CoverageSingleRun], seed: Seed, typ: SeedType) -> None: + if not coverage: + logging.warning(f"Coverage not found after replaying: {self.__seed_hash(seed)} [{typ.name}] (add it anyway)") - # All FORMAT_STRING related alerts - elif type in ["SV_TAINTED_FMTSTR", "SV_FMTSTR_GENERIC"]: - ptr = se.pstate.get_argument_value(2) - return FormatStringSanitizer.check(se, state, addr, ptr) + # Add the seed anyway, if it was not possible to re-run the seed. + # TODO Set seed.coverage_objectives as "empty" (use ellipsis + # object). Modify WorklistAddressToSet to support it. + self._dse.add_input_seed(seed) - ###################################################################### + self._seeds_merged += 1 + else: + # Check whether the seed improves the current coverage. + if self._dse.coverage.improve_coverage(coverage): + logging.info(f"Seed added {self.__seed_hash(seed)} [{typ.name}] (coverage merged)") - # All INVALID_MEMORY related alerts - # FIXME: NPD_CHECK_MIGHT and NPD_CONST_CALL are not supported by klocwork-alert-inserter - elif type in ["NPD_FUNC_MUST", "NPD_FUNC_MIGHT", "NPD_CHECK_MIGHT", "NPD_CONST_CALL"]: - ptr = se.pstate.get_argument_value(2) - return NullDerefSanitizer.check(se, state, ptr, f'Invalid memory access at {ptr:#x}') + self._dse.coverage.merge(coverage) + self._dse.seeds_manager.worklist.update_worklist(coverage) - ###################################################################### + seed.coverage_objectives = self._dse.coverage.new_items_to_cover(coverage) - elif type == "MISRA_ETYPE_CATEGORY_DIFFERENT_2012": - expr = se.pstate.get_argument_symbolic(2).getAst() + self._dse.add_input_seed(seed) - # Runtime check - if expr.isSigned(): - # FIXME: Do we have to define the seed as CRASH even if there is no crash? - # FIXME: Maybe we have to define a new TAG like BUG or VULN or whatever - return True + self._seeds_merged += 1 + else: + logging.info(f"Seed rejected {self.__seed_hash(seed)} [{typ.name}] (NOT merging coverage)") - # Symbolic check - actx = se.pstate.tt_ctx.getAstContext() - size = expr.getBitvectorSize() - 1 - predicate = [se.pstate.tt_ctx.getPathPredicate(), actx.extract(size - 1, size - 1, expr) == 1] + self._seeds_rejected += 1 - model = se.pstate.tt_ctx.getModel(actx.land(predicate)) - if model: - crash_seed = mk_new_crashing_seed(se, model) - se.workspace.save_seed(crash_seed) - logging.info(f'Model found for a seed which may lead to a crash ({crash_seed.filename})') - return True - return False + def __add_seed(self, raw_seed: bytes): + seed = self.__from_raw_seed(raw_seed) + seed_hash = self.__seed_hash(seed) - else: - logging.error(f"Unsupported alert kind {type}") + if seed_hash in self._seed_recvs: + logging.warning(f"Receiving already known seed: {self.__seed_hash(seed)} (dropped)") + return + # Remember seeds received, so we do not send them back to the broker. + self._seed_recvs.add(seed_hash) - def _do_stop_all_alerts_validated(self) -> None: - """ - Function called if all alerts have been covered and validated. All data are meant to - have been transmitted to the broker, but writes down locally the CSV anyway - :return: None - """ - logging.info("All defaults and vulnerability have been covered !") + # Enqueue seed to be process (either add it directly to the dse or + # replay it). + self._seed_recvs_queue.put(seed) - # Write the final CSV in the workspace directory - out_file = self.dse.workspace.get_metadata_file_path("klocwork_coverage_results.csv") - self.sast_report.write_csv(out_file) + logging.info(f"Seed received {self.__seed_hash(seed)} (pool: {self._seed_recvs_queue.qsize()})") - # Stop the dse exploration - self.dse.terminate_exploration() + def __send_seed(self, seed: Seed) -> None: + status_mapper = { + SeedStatus.CRASH: SeedType.CRASH, + SeedStatus.FAIL: SeedType.HANG, + SeedStatus.HANG: SeedType.HANG, + SeedStatus.NEW: SeedType.INPUT, + SeedStatus.OK_DONE: SeedType.INPUT, + } + seed_hash = self.__seed_hash(seed) - def is_compatible_with_local(self, program: Program) -> bool: - """ - Checks whether the given program is compatible with the current architecture - and platform. + # Do not send back a seed that already came from the broker. + if seed_hash not in self._seed_recvs: + if seed.status == SeedStatus.NEW: + logging.info(f"Sending new seed: {self.__seed_hash(seed)} [{self._seeds_sent_count}]") - :param program: Program - :return: True if the program can be run locally - """ - arch_m = {"i386": Architecture.X86, "x86_64": Architecture.X86_64, "armv7l": Architecture.ARM32, "aarch64": Architecture.AARCH64} - plfm_m = {"Linux": Platform.LINUX, "Windows": Platform.WINDOWS, "MacOS": Platform.MACOS, "iOS": Platform.IOS} - local_arch, local_plfm = arch_m[platform.machine()], plfm_m[platform.system()] - return program.architecture == local_arch and program.platform == local_plfm + # Only count new seeds. + self._seeds_sent_count += 1 + + self._agent.send_seed(status_mapper[seed.status], self.__to_raw_seed(seed)) + + def __send_alert_data(self, alert, seed, address): + self._agent.send_alert_data(AlertData(alert.id, alert.covered, alert.validated, seed.content, address)) diff --git a/engines/pastistritondse/replayer.py b/engines/pastistritondse/replayer.py new file mode 100644 index 0000000..3666a99 --- /dev/null +++ b/engines/pastistritondse/replayer.py @@ -0,0 +1,102 @@ +import logging +import os +import time +from pathlib import Path +from typing import Optional, Tuple + +from tritondse import CoverageSingleRun +from tritondse.trace import QBDITrace, TraceException + +from libpastis.types import SeedInjectLoc + + +class Replayer(object): + + INPUT_FILE_NAME = "input_file" + + RAMDISK_PATH = "/mnt/ramdisk" + + DEFAULT_WS_DIR = "tritondse_replayer_workspace" + + SEED_FILE = "tritondse.seed" + TRACE_FILE = "tritondse.trace" + + def __init__(self, program, config, seed_inj): + self.__config = config + self.__program = program + self.__seed_inj = seed_inj + + self.__coverage_strategy = self.__config.coverage_strategy + self.__program_argv = None + self.__program_cwd: Path = Path(self.__program.path).parent + self.__program_path: Path = self.__program.path.resolve() + self.__replay_timeout = 60 + self.__seed_file = None + self.__stdin_file = None + self.__trace_file = None + + self.__initialize_files() + + self.__initialize_stdin() + self.__initialize_argv() + + def __initialize_files(self): + ramdisk_dir = Path(self.RAMDISK_PATH) + + tmp_dir = ramdisk_dir if ramdisk_dir.exists() else Path(f"/tmp") + tmp_dir = tmp_dir / self.DEFAULT_WS_DIR / f"{int(time.time())}" + tmp_dir.mkdir(parents=True) + + logging.info(f"tmp directory for replayer set to: {tmp_dir}") + + self.__trace_file = tmp_dir / self.TRACE_FILE + self.__seed_file = tmp_dir / self.SEED_FILE + + def __initialize_stdin(self): + if self.__seed_inj == SeedInjectLoc.STDIN: + self.__stdin_file = str(self.__seed_file) + else: + self.__stdin_file = None + + def __initialize_argv(self): + # Copy program_argv as we might modify it. + argv = self.__config.program_argv[:] + + if self.__seed_inj == SeedInjectLoc.ARGV: + try: + # Replace 'input_file' in argv with the temporary file name created + idx = argv.index(self.INPUT_FILE_NAME) + argv[idx] = str(self.__seed_file) + except ValueError: + logging.error(f"seed injection {self.__seed_inj.name} but can't find '{self.INPUT_FILE_NAME}' on program argv") + raise Exception(f"No '{self.INPUT_FILE_NAME}' in program argv.") + + self.__program_argv = argv[1:] if len(argv) > 1 else [] + + def run(self, seed: bytes) -> Tuple[Optional[CoverageSingleRun], float]: + start_time = time.time() + + self.__seed_file.write_bytes(seed) + + coverage = None + try: + # Run the seed and obtain the coverage. + rv = QBDITrace.run(self.__coverage_strategy, + str(self.__program_path), + self.__program_argv, + output_path=str(self.__trace_file), + stdin_file=self.__stdin_file, + cwd=str(self.__program_cwd), + timeout=self.__replay_timeout) + if rv: + coverage = QBDITrace.from_file(str(self.__trace_file)).coverage + except TraceException: + logging.warning('There was an error while trying to re-run the seed') + except FileNotFoundError: + logging.warning("Cannot load the coverage file generated (maybe had crashed?)") + except Exception as e: + logging.warning(f'Unexpected error occurred while trying to re-run the seed: {e}') + + replay_time = time.time() - start_time + + return coverage, replay_time diff --git a/engines/pastistritondse/utils.py b/engines/pastistritondse/utils.py new file mode 100644 index 0000000..a3809ea --- /dev/null +++ b/engines/pastistritondse/utils.py @@ -0,0 +1,31 @@ +import platform + +from tritondse.loaders import Program +from tritondse.types import Architecture, Platform + + +def is_compatible_with_local(program: Program) -> bool: + """ + Checks whether the given program is compatible with the current architecture + and platform. + + :param program: Program + :return: True if the program can be run locally + """ + arch_m = { + "i386": Architecture.X86, + "x86_64": Architecture.X86_64, + "armv7l": Architecture.ARM32, + "aarch64": Architecture.AARCH64 + } + + plfm_m = { + "Linux": Platform.LINUX, + "Windows": Platform.WINDOWS, + "MacOS": Platform.MACOS, + "iOS": Platform.IOS + } + + local_arch, local_plfm = arch_m[platform.machine()], plfm_m[platform.system()] + + return program.architecture == local_arch and program.platform == local_plfm