forked from 18F/domain-scan
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathgather
executable file
·207 lines (158 loc) · 6.88 KB
/
gather
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python3
import os
import glob
import sys
import re
import csv
import requests
import logging
import importlib
from utils import utils
# some metadata about the scan itself
start_time = utils.local_now()
start_command = str.join(" ", sys.argv)
# Applied if --ignore-www is enabled.
strip_www = re.compile(r"^www\.")
# Applied to all domains.
strip_protocol = re.compile(r"^https?://")
strip_wildcard = re.compile(r"^(\*\.)+")
strip_redacted = re.compile(r"^(\?\.)+")
def run(options=None, cache_dir="./cache", results_dir="./results"):
sources = options["gatherers"]
suffixes = options.get("suffix")
suffix_pattern = utils.suffix_pattern(suffixes)
# Clear out existing result CSVs, to avoid inconsistent data.
for result in glob.glob("%s/*.csv" % results_dir):
os.remove(result)
# Opt in to include parent (second-level) domains.
include_parents = options.get("include_parents", False)
# Opt into stripping www. prefixes from hostnames, effectively
# collapsing www.[host] and [host] into one record.
ignore_www = options.get("ignore_www", False)
# --parents should be a CSV whose first column is parent domains
# that will act as a whitelist for which subdomains to gather.
parents = get_parent_domains(options, cache_dir=cache_dir)
# De-duping hostnames. This will cause the system to hold all
# hostnames in memory at once, but oh well.
hostnames_cache = {}
for source in sources:
extra = {}
try:
gatherer_module = importlib.import_module(
"gatherers.%s" % source)
gatherer = gatherer_module.Gatherer(suffixes, options, extra)
except ImportError:
# If it's not a registered module, allow it to be "hot registered"
# as long as the user gave us a flag with that name that can be
# used as the --url option to the URL module.
if options.get(source):
gatherer_module = importlib.import_module("gatherers.url")
extra['name'] = source
gatherer = gatherer_module.Gatherer(suffixes, options, extra)
else:
exc_type, exc_value, exc_traceback = sys.exc_info()
logging.error("[%s] Gatherer not found, or had an error during loading.\n\tERROR: %s\n\t%s" % (source, exc_type, exc_value))
exit(1)
# Iterate over each hostname.
for domain in gatherer.gather():
# Always apply the suffix filter to returned names.
if not suffix_pattern.search(domain):
continue
# Strip off whitespace before pre-processing.
domain = domain.strip()
# Cut off protocols, if present.
domain = strip_protocol.sub("", domain)
# Cut naive wildcard prefixes out. (from certs)
domain = strip_wildcard.sub("", domain)
# Cut off any redaction markers from names. (from certs)
domain = strip_redacted.sub("", domain)
# Strip www. prefixes from hostnames, effectively
# collapsing www.[host] and [host] into one record.
if ignore_www:
domain = strip_www.sub("", domain)
# Strip off whitespace after pre-processing.
domain = domain.strip()
base = utils.base_domain_for(domain)
# Unless --include-parents is specified, exclude them.
if not include_parents:
# Always ignore www prefixes for base domains.
if (domain == base) or (domain == "www.%s" % base):
continue
# Apply --parent domain whitelist, if present.
if parents:
if base not in parents:
continue
# Use hostname cache to de-dupe, if seen before.
if domain not in hostnames_cache:
hostnames_cache[domain] = [source]
elif source not in hostnames_cache[domain]:
hostnames_cache[domain] += [source]
# Now that we've gone through all sources and logged when each
# domain appears in each one, go through cache and write
# all of them to disk.
# Assemble headers.
headers = ["Domain", "Base Domain"]
# Add headers dynamically for each source.
headers += sources
# Open CSV file.
gathered_filename = "%s/%s.csv" % (results_dir, "gathered")
gathered_file = open(gathered_filename, 'w', newline='')
gathered_writer = csv.writer(gathered_file)
gathered_writer.writerow(headers)
# Write each hostname to disk, with all discovered sources.
hostnames = list(hostnames_cache.keys())
hostnames.sort()
for hostname in hostnames:
base = utils.base_domain_for(hostname)
row = [hostname, base]
for source in sources:
row += [source in hostnames_cache[hostname]]
gathered_writer.writerow(row)
# Close CSV file.
gathered_file.close()
# If sort requested, sort in place by domain.
if options.get("sort"):
utils.sort_csv(gathered_filename)
logging.warning("Results written to CSV.")
# Save metadata.
end_time = utils.local_now()
metadata = {
'start_time': utils.utc_timestamp(start_time),
'end_time': utils.utc_timestamp(end_time),
'command': start_command
}
utils.write(utils.json_for(metadata), "%s/meta.json" % results_dir)
# Read in parent domains from the first column of a given CSV.
def get_parent_domains(options, cache_dir="./cache"):
parents = options.get("parents")
if not parents:
return None
# If --parents is a URL, we want to download it now,
# and then adjust the value to be the path of the cached download.
if parents.startswith("http:") or parents.startswith("https:"):
# Though it's saved in cache/, it will be downloaded every time.
parents_path = os.path.join(cache_dir, "parents.csv")
try:
response = requests.get(parents)
utils.write(response.text, parents_path)
except:
logging.error("Parent domains URL not downloaded successfully.")
print(utils.format_last_exception())
exit(1)
parents = parents_path
parent_domains = []
with open(parents, encoding='utf-8', newline='') as csvfile:
for row in csv.reader(csvfile):
if (not row[0]) or (row[0].lower() == "domain") or (row[0].lower() == "domain name"):
continue
parent_domains.append(row[0].lower())
return parent_domains
if __name__ == '__main__':
options = utils.options_for_gather()
utils.configure_logging(options)
# Support --output flag for changing where cache/ and results/ go.
cache_dir = utils.cache_dir(options)
results_dir = utils.results_dir(options)
utils.mkdir_p(cache_dir)
utils.mkdir_p(results_dir)
run(options, cache_dir=cache_dir, results_dir=results_dir)