-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathtriage_crash.py
115 lines (106 loc) · 3.89 KB
/
triage_crash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import config
# import threading
from browser_selenium import get_browser
# from playwright.sync_api import sync_playwright
import os
# import shutil
import json
import logging
from tqdm import tqdm
from datetime import datetime
def process_log(log: str) -> [str]:
res = set()
splited = log.splitlines()
for i, line in enumerate(splited):
if "SUMMARY" in line:
res.add(line)
elif "WTFCrash" in line:
root = splited[i + 1].split(" ")[-2]
res.add(f"{line}\n{root}")
if len(res) == 0:
return ["normal_crash"]
else:
return res
def multiple_mode(options):
res_dic = {}
browser = get_browser(0, options["browser"], options["timeout"])
crash_dir = options["crash_dir"]
loop_number = int(os.getenv("LOOP_NUMBER") if "LOOP_NUMBER" in os.environ else 1)
time = 0
cnt = 0
for file in tqdm(os.listdir(crash_dir)):
crash_path = os.path.abspath(os.path.join(crash_dir, file))
split_name = file.split(".")
if len(split_name) > 2:
logging.info(f"strange thing: {split_name}")
if split_name[-1] == "log": # if it is a log file
with open(crash_path, "r") as f:
log = f.read()
logging.info(log)
keys = process_log(log)
for key in keys:
if key in res_dic:
res_dic[key].append(split_name[0])
else:
res_dic[key] = [split_name[0]]
logging.info(f"{keys}, {split_name[0]}")
continue
if os.path.exists(crash_path + ".log"): # if it has been processed
continue
# then it is a crash file without processing
for _ in range(loop_number):
browser.ready()
start = datetime.now()
res = browser.fuzz(crash_path)
end = datetime.now()
time += (end - start).microseconds
cnt += 1
if not res:
log = browser.message()
with open(crash_path + ".log", "w") as f:
f.write(log)
logging.info(log)
keys = process_log(log)
for key in keys:
if key in res_dic:
res_dic[key].append(split_name[0])
else:
res_dic[key] = [split_name[0]]
logging.info(f"{keys}, {split_name[0]}")
else:
key = "false-positive"
if key in res_dic:
res_dic[key].append(split_name[0])
else:
res_dic[key] = [split_name[0]]
logging.info(f"{key}, {split_name[0]}")
logging.info(f"average time: {time / cnt} us")
with open("res.json", "w") as f:
json.dump(res_dic, f)
def single_mode(options):
browser = get_browser(0, options["browser"], options["timeout"])
crash_path = os.path.abspath(options["crash_dir"])
file_name = crash_path.split("/")[-1]
loop_number = int(os.getenv("LOOP_NUMBER") if "LOOP_NUMBER" in os.environ else 1)
for i in range(loop_number):
logging.info(f"iteration: {i}")
browser.ready()
res = browser.fuzz(crash_path)
if not res:
log = browser.message()
logging.info(log)
if not os.path.exists(crash_path + ".log"):
with open(crash_path + ".log", "w") as f:
f.write(log)
keys = process_log(log)
logging.info(f"{keys}, {file_name}")
else:
key = "false-positive"
logging.info(f"{key}, {file_name}")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
options = config.get_triage_option()
if options["mode"] == "multiple":
multiple_mode(options)
elif options["mode"] == "single":
single_mode(options)