We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
使用cdb作为进程debugger,效率一般,但可以满足特定需求,目前在写自己的windows debugger
cdb
debugger
windows debugger
#encoding:utf-8 # ==================================================== # python: 3.5+ # 利用cdb作为debugger的一个简单传统fuzzer # 执行: python3 xfuzzer.py -h 查看帮助选项 # 产生一个文件: # uniq_crash_type_YYYY-mm-dd.txt => 所有crash type结果 # type是根据call stack 最后两个调用函数划分 # fuzz_output_post_fix用于标记不同fuzz程序的结果文件 # example: # "C:\Users\dt\AppData\Local\Programs\Python\Python37\python3.exe" xfuzzer.py -f H:\fuzzing\FuzzPdf\FuzzPdf.exe -o pdf -d H:\pdf_corpus -s H:\poc.pdf # "C:\Users\dt\AppData\Local\Programs\Python\Python37\python3.exe" xfuzzer.py -f H:\fuzzing\FuzzPdf\FuzzPdf.exe -o pdf_xfuzz -d H:\pdf_handled -c H:\pdf_corpus_mutate -s H:\fuzz_cache\poc.pdf # ==================================================== import sys import glob import subprocess import os import datetime import signal from time import monotonic as timer import argparse from threading import Timer import time from multiprocessing import Process,Pool, Manager import signal import shutil import logging kill = lambda process: process.kill() # 调试器位置 debugger = "C:\\Program Files (x86)\\Windows Kits\\10\Debuggers\\x64\\cdb.exe" # 结果存储文件 uniq_crash_type_file = 'uniq_crash_type_{}'.format((datetime.datetime.now()).strftime("%Y-%m-%d")) options = '' class Logger(object): def __init__(self): """ initial """ # log_path = logPath logging.addLevelName(20, "NOTICE:") logging.addLevelName(30, "WARNING:") logging.addLevelName(40, "FATAL:") logging.addLevelName(50, "FATAL:") logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(asctime)s %(filename)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename='handle.log', filemode="a") console = logging.StreamHandler() console.setLevel(logging.DEBUG) formatter = logging.Formatter("%(levelname)s %(asctime)s %(filename)s %(message)s") console.setFormatter(formatter) logging.getLogger("").addHandler(console) def debug(self, msg=""): """ output DEBUG level LOG """ logging.debug(str(msg)) def info(self, msg=""): """ output INFO level LOG """ logging.info(str(msg)) def warning(self, msg=""): """ output WARN level LOG """ logging.warning(str(msg)) def exception(self, msg=""): """ output Exception stack LOG """ logging.exception(str(msg)) def error(self, msg=""): """ output ERROR level LOG """ logging.error(str(msg)) def critical(self, msg=""): """ output FATAL level LOG """ logging.critical(str(msg)) logger = Logger() # 根据调用栈最后两个函数判定类型 (Windows) def get_crash_type_win32(stdout): # 获取到crash type i = 0 stdout_lines = stdout.decode().split('\n') crash_type = None for crash_data in stdout_lines: if "RetAddr" in crash_data: break i += 1 if i == len(stdout_lines): return None i += 1 # 得出最后两个调用栈作为划分类型 if len(stdout_lines) > i and len(stdout_lines[i].split(' ')) >= 3: crash_type = stdout_lines[i].split(' ')[2] if len(stdout_lines) > i+1 and len(stdout_lines[i+1].split(' ')) >= 3: crash_type += ' -> ' + stdout_lines[i+1].split(' ')[2] return crash_type # 将crash_types写入文件 def handle_crash_types(crash_types): fp = open(uniq_crash_type_file, 'w') for crash_type in crash_types: if not crash_type: continue fp.write(crash_type + ':\n') for crash_file in crash_types[crash_type]: fp.write(" [+] {}\n".format(crash_file)) fp.close() def prepare_data(corpus_dir): files = [] files.extend(glob.glob(corpus_dir+'/*', recursive=True)) if len(files) == 0: return None return files def handle_result(output_queue): global uniq_crash_type_file uniq_crash_type_file = uniq_crash_type_file+'_{}.txt'.format(options.output_post_fix) print ('total output result:' + str(output_queue.qsize())) crash_types = {} while not output_queue.empty(): data = output_queue.get() crash_type = data[0] # crash/timeout input_file = data[1] stdout = data[2] if crash_type == 'timeout': # 添加timeout文件 if 'timeout' not in crash_types.keys(): crash_types['timeout'] = [input_file] else: crash_types['timeout'].append(input_file) else: crash_type = get_crash_type_win32(stdout) if crash_type == None: continue if crash_type not in crash_types.keys(): crash_types[crash_type] = [input_file] else: crash_types[crash_type].append(input_file) handle_crash_types(crash_types) def fuzz(input_queue, output_queue, fuzz_program, specific_file_arg, dst_dir): while not input_queue.empty(): data = input_queue.get() index = data[0] input_file = data[1] start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) logger.info('[start: {}] handling {} {}'.format(start_time, index, input_file)) # 将crash文件数据写入特定文件 specific_file = None if specific_file_arg: file_name, file_ext = os.path.splitext(specific_file_arg) specific_file = '{}_{}{}'.format(file_name, str(os.getpid()), file_ext) with open(input_file, 'rb') as f: fp_specific = open(specific_file, 'wb') crash_file_data = f.read() fp_specific.write(crash_file_data) fp_specific.close() else: specific_file = input_file stdout = None stderr = None crash_type = None crash_types = {} # 处理windows command = [debugger, "-y", "srv*c:\symbols*https://msdl.microsoft.com/download/symbols", "-c", "g;kp 10;q", fuzz_program, specific_file] process_out = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # timeout code by jfs https://stackoverflow.com/questions/36952245/subprocess-timeout-failure try: stdout, stderr = process_out.communicate(timeout=10) except subprocess.TimeoutExpired: # os.kill(process_out.pid, signal.SIGINT) # send signal to the process group subprocess.call(['taskkill', '/F', '/T', '/PID', str(process_out.pid)]) stdout, stderr = process_out.communicate() output_queue.put(('timeout', input_file, None)) output_queue.put(('crash', input_file, stdout)) crash_type = get_crash_type_win32(stdout) if crash_type != None: logger.info(crash_type) file_basename = os.path.basename(input_file) dst_file = os.path.join(dst_dir, file_basename) if os.path.exists(dst_file): logger.info(dst_file + ' exists!') else: shutil.move(input_file, dst_file) def init_fuzzer(): """ Pool worker initializer for keyboard interrupt on Windows """ signal.signal(signal.SIGINT, signal.SIG_IGN) def main(): global options files = [] if options.corpus_dir: files = prepare_data(options.corpus_dir) else: logger.error("No corpus directory") if not files: logger.error('No corpus files') exit(1) logger.info("total files: " + str(len(files))) # 输出已经处理corpus目录 if not os.path.exists(options.output_directory): os.mkdir(options.output_directory) manager = Manager() input_queue = manager.Queue() output_queue = manager.Queue() count = 0 for file_name in files: input_queue.put((count, file_name)) count += 1 # 准备四个进程 logger.info('start process...') pool = Pool(4, init_fuzzer) worker_num = 60 fuzzer_threads = [] for i in range(worker_num): result = pool.apply_async(fuzz, args=(input_queue, output_queue, options.fuzz_program, options.specific_file, options.output_directory) ) fuzzer_threads.append(result) try: [fuzzer.get() for fuzzer in fuzzer_threads] except: logger.exception("User enter keyboard interupted!") pool.terminate() pool.join() # write result to file logger.info('write data to file...') handle_result(output_queue) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-f", "--fuzz_program", help="fuzz program", required=True) parser.add_argument("-c", "--corpus_dir", help="fuzz corpus directory", required=True) parser.add_argument("-d", "--output_directory", help="fuzz output directory(absolute path)", required=True) parser.add_argument("-o", "--output_post_fix", help="output file post fix") parser.add_argument("-s", "--specific_file", help="write crash file to a specific file") options = parser.parse_args() main()
The text was updated successfully, but these errors were encountered:
功能过于简单,已放弃,目前已经用rust重写,自定义debugger/mutator,丰富变异算法
rust
debugger/mutator
Sorry, something went wrong.
No branches or pull requests
使用
cdb
作为进程debugger
,效率一般,但可以满足特定需求,目前在写自己的windows debugger
The text was updated successfully, but these errors were encountered: