-
Notifications
You must be signed in to change notification settings - Fork 73
/
ssl_checker.py
executable file
·395 lines (323 loc) · 16.2 KB
/
ssl_checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
#!/usr/bin/env python3
import socket
import sys
import json
from argparse import ArgumentParser, SUPPRESS
from datetime import datetime
from time import sleep
from csv import DictWriter
try:
from OpenSSL import SSL
from json2html import *
except ImportError:
print('Please install required modules: pip install -r requirements.txt')
sys.exit(1)
class Clr:
"""Text colors."""
RST = '\033[39m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
class SSLChecker:
total_valid = 0
total_expired = 0
total_failed = 0
total_warning = 0
def get_cert(self, host, port, socks_host=None, socks_port=None):
"""Connection to the host."""
if socks_host:
import socks
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, socks_host, int(socks_port), True)
socket.socket = socks.socksocket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
osobj = SSL.Context(SSL.TLSv1_2_METHOD)
sock.settimeout(5)
sock.connect((host, int(port)))
sock.settimeout(None)
oscon = SSL.Connection(osobj, sock)
oscon.set_tlsext_host_name(host.encode())
oscon.set_connect_state()
oscon.do_handshake()
cert = oscon.get_peer_certificate()
resolved_ip = socket.gethostbyname(host)
sock.close()
return cert, resolved_ip
def border_msg(self, message):
"""Print the message in the box."""
row = len(message)
h = ''.join(['+'] + ['-' * row] + ['+'])
result = h + '\n' "|" + message + "|"'\n' + h
print(result)
def analyze_ssl(self, host, context, user_args):
"""Analyze the security of the SSL certificate."""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
api_url = 'https://api.ssllabs.com/api/v3/'
while True:
if user_args.verbose:
print('{}Requesting analyze to {}{}\n'.format(Clr.YELLOW, api_url, Clr.RST))
main_request = json.loads(urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8'))
if main_request['status'] in ('DNS', 'IN_PROGRESS'):
if user_args.verbose:
print('{}Analyze waiting for reports to be finished (5 secs){}\n'.format(Clr.YELLOW, Clr.RST))
sleep(5)
continue
elif main_request['status'] == 'READY':
if user_args.verbose:
print('{}Analyze is ready{}\n'.format(Clr.YELLOW, Clr.RST))
break
endpoint_data = json.loads(urlopen(api_url + 'getEndpointData?host={}&s={}'.format(
host, main_request['endpoints'][0]['ipAddress'])).read().decode('utf-8'))
if user_args.verbose:
print('{}Analyze report message: {}{}\n'.format(Clr.YELLOW, endpoint_data['statusMessage'], Clr.RST))
# if the certificate is invalid
if endpoint_data['statusMessage'] == 'Certificate not valid for domain name':
return context
context[host]['grade'] = main_request['endpoints'][0]['grade']
context[host]['poodle_vuln'] = endpoint_data['details']['poodle']
context[host]['heartbleed_vuln'] = endpoint_data['details']['heartbleed']
context[host]['heartbeat_vuln'] = endpoint_data['details']['heartbeat']
context[host]['freak_vuln'] = endpoint_data['details']['freak']
context[host]['logjam_vuln'] = endpoint_data['details']['logjam']
context[host]['drownVulnerable'] = endpoint_data['details']['drownVulnerable']
return context
def get_cert_sans(self, x509cert):
"""
Get Subject Alt Names from Certificate. Shameless taken from stack overflow:
https://stackoverflow.com/users/4547691/anatolii-chmykhalo
"""
san = ''
ext_count = x509cert.get_extension_count()
for i in range(0, ext_count):
ext = x509cert.get_extension(i)
if 'subjectAltName' in str(ext.get_short_name()):
san = ext.__str__()
# replace commas to not break csv output
san = san.replace(',', ';')
return san
def get_cert_info(self, host, cert, resolved_ip):
"""Get all the information about cert and create a JSON file."""
context = {}
cert_subject = cert.get_subject()
context['host'] = host
context['resolved_ip'] = resolved_ip
context['issued_to'] = cert_subject.CN
context['issued_o'] = cert_subject.O
context['issuer_c'] = cert.get_issuer().countryName
context['issuer_o'] = cert.get_issuer().organizationName
context['issuer_ou'] = cert.get_issuer().organizationalUnitName
context['issuer_cn'] = cert.get_issuer().commonName
context['cert_sn'] = str(cert.get_serial_number())
context['cert_sha1'] = cert.digest('sha1').decode()
context['cert_alg'] = cert.get_signature_algorithm().decode()
context['cert_ver'] = cert.get_version()
context['cert_sans'] = self.get_cert_sans(cert)
context['cert_exp'] = cert.has_expired()
context['cert_valid'] = False if cert.has_expired() else True
# Valid from
valid_from = datetime.strptime(cert.get_notBefore().decode('ascii'),
'%Y%m%d%H%M%SZ')
context['valid_from'] = valid_from.strftime('%Y-%m-%d')
# Valid till
valid_till = datetime.strptime(cert.get_notAfter().decode('ascii'),
'%Y%m%d%H%M%SZ')
context['valid_till'] = valid_till.strftime('%Y-%m-%d')
# Validity days
context['validity_days'] = (valid_till - valid_from).days
# Validity in days from now
now = datetime.now()
context['days_left'] = (valid_till - now).days
# Valid days left
context['valid_days_to_expire'] = (datetime.strptime(context['valid_till'],
'%Y-%m-%d') - datetime.now()).days
if cert.has_expired():
self.total_expired += 1
else:
self.total_valid += 1
# If the certificate has less than 15 days validity
if context['valid_days_to_expire'] <= 15:
self.total_warning += 1
return context
def print_status(self, host, context, analyze=False):
"""Print all the usefull info about host."""
print('\t{}[\u2713]{} {}\n\t{}'.format(Clr.GREEN if context[host]['cert_valid'] else Clr.RED, Clr.RST, host, '-' * (len(host) + 5)))
print('\t\tIssued domain: {}'.format(context[host]['issued_to']))
print('\t\tIssued to: {}'.format(context[host]['issued_o']))
print('\t\tIssued by: {} ({})'.format(context[host]['issuer_o'], context[host]['issuer_c']))
print('\t\tServer IP: {}'.format(context[host]['resolved_ip']))
print('\t\tValid from: {}'.format(context[host]['valid_from']))
print('\t\tValid to: {} ({} days left)'.format(context[host]['valid_till'], context[host]['valid_days_to_expire']))
print('\t\tValidity days: {}'.format(context[host]['validity_days']))
print('\t\tCertificate valid: {}'.format(context[host]['cert_valid']))
print('\t\tCertificate S/N: {}'.format(context[host]['cert_sn']))
print('\t\tCertificate SHA1 FP: {}'.format(context[host]['cert_sha1']))
print('\t\tCertificate version: {}'.format(context[host]['cert_ver']))
print('\t\tCertificate algorithm: {}'.format(context[host]['cert_alg']))
if analyze:
print('\t\tCertificate grade: {}'.format(context[host]['grade']))
print('\t\tPoodle vulnerability: {}'.format(context[host]['poodle_vuln']))
print('\t\tHeartbleed vulnerability: {}'.format(context[host]['heartbleed_vuln']))
print('\t\tHeartbeat vulnerability: {}'.format(context[host]['heartbeat_vuln']))
print('\t\tFreak vulnerability: {}'.format(context[host]['freak_vuln']))
print('\t\tLogjam vulnerability: {}'.format(context[host]['logjam_vuln']))
print('\t\tDrown vulnerability: {}'.format(context[host]['drownVulnerable']))
print('\t\tExpired: {}'.format(context[host]['cert_exp']))
print('\t\tCertificate SANs: ')
for san in context[host]['cert_sans'].split(';'):
print('\t\t \\_ {}'.format(san.strip()))
print('\n')
def show_result(self, user_args):
"""Get the context."""
context = {}
start_time = datetime.now()
hosts = user_args.hosts
if not user_args.json_true and not user_args.summary_true:
self.border_msg(' Analyzing {} host(s) '.format(len(hosts)))
if not user_args.json_true and user_args.analyze:
print('{}Warning: -a/--analyze is enabled. It takes more time...{}\n'.format(Clr.YELLOW, Clr.RST))
for host in hosts:
if user_args.verbose:
print('{}Working on host: {}{}\n'.format(Clr.YELLOW, host, Clr.RST))
host, port = self.filter_hostname(host)
# Check duplication
if host in context.keys():
continue
try:
# Check if socks should be used
if user_args.socks:
if user_args.verbose:
print('{}Socks proxy enabled, connecting via proxy{}\n'.format(Clr.YELLOW, Clr.RST))
socks_host, socks_port = self.filter_hostname(user_args.socks)
cert, resolved_ip = self.get_cert(host, port, socks_host, socks_port)
else:
cert, resolved_ip = self.get_cert(host, port)
context[host] = self.get_cert_info(host, cert, resolved_ip)
context[host]['tcp_port'] = int(port)
# Analyze the certificate if enabled
if user_args.analyze:
context = self.analyze_ssl(host, context, user_args)
if not user_args.json_true and not user_args.summary_true:
self.print_status(host, context, user_args.analyze)
except SSL.SysCallError:
context[host] = 'failed'
if not user_args.json_true:
print('\t{}[\u2717]{} {:<20s} Failed: Misconfigured SSL/TLS\n'.format(Clr.RED, Clr.RST, host))
self.total_failed += 1
except Exception as error:
context[host] = 'failed'
if not user_args.json_true:
print('\t{}[\u2717]{} {:<20s} Failed: {}\n'.format(Clr.RED, Clr.RST, host, error))
self.total_failed += 1
except KeyboardInterrupt:
print('{}Canceling script...{}\n'.format(Clr.YELLOW, Clr.RST))
sys.exit(1)
if not user_args.json_true:
self.border_msg(' Successful: {} | Failed: {} | Valid: {} | Warning: {} | Expired: {} | Duration: {} '.format(
len(hosts) - self.total_failed, self.total_failed, self.total_valid,
self.total_warning, self.total_expired, datetime.now() - start_time))
if user_args.summary_true:
# Exit the script just
return
# CSV export if -c/--csv is specified
if user_args.csv_enabled:
self.export_csv(context, user_args.csv_enabled, user_args)
# HTML export if -x/--html is specified
if user_args.html_true:
self.export_html(context)
# While using the script as a module
if __name__ != '__main__':
return json.dumps(context)
# Enable JSON output if -j/--json argument specified
if user_args.json_true:
print(json.dumps(context))
if user_args.json_save_true:
for host in context.keys():
with open(host + '.json', 'w', encoding='UTF-8') as fp:
fp.write(json.dumps(context[host]))
def export_csv(self, context, filename, user_args):
"""Export all context results to CSV file."""
# prepend dict keys to write column headers
if user_args.verbose:
print('{}Generating CSV export{}\n'.format(Clr.YELLOW, Clr.RST))
with open(filename, 'w') as csv_file:
csv_writer = DictWriter(csv_file, list(context.items())[0][1].keys())
csv_writer.writeheader()
for host in context.keys():
csv_writer.writerow(context[host])
def export_html(self, context):
"""Export JSON to HTML."""
html = json2html.convert(json=context)
file_name = datetime.strftime(datetime.now(), '%Y_%m_%d_%H_%M_%S')
with open('{}.html'.format(file_name), 'w') as html_file:
html_file.write(html)
return
def filter_hostname(self, host):
"""Remove unused characters and split by address and port."""
host = host.replace('http://', '').replace('https://', '').replace('/', '')
port = 443
if ':' in host:
host, port = host.split(':')
return host, port
def get_args(self, json_args={}):
"""Set argparse options."""
parser = ArgumentParser(prog='ssl_checker.py', add_help=False,
description="""Collects useful information about the given host's SSL certificates.""")
if len(json_args) > 0:
args = parser.parse_args()
setattr(args, 'json_true', True)
setattr(args, 'verbose', False)
setattr(args, 'csv_enabled', False)
setattr(args, 'html_true', False)
setattr(args, 'json_save_true', False)
setattr(args, 'socks', False)
setattr(args, 'analyze', False)
setattr(args, 'hosts', json_args['hosts'])
return args
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-H', '--host', dest='hosts', nargs='*',
required=False, help='Hosts as input separated by space')
group.add_argument('-f', '--host-file', dest='host_file',
required=False, help='Hosts as input from a file')
parser.add_argument('-s', '--socks', dest='socks',
default=False, metavar='HOST:PORT',
help='Enable SOCKS proxy for connection')
parser.add_argument('-c', '--csv', dest='csv_enabled',
default=False, metavar='FILENAME.CSV',
help='Enable CSV file export')
parser.add_argument('-j', '--json', dest='json_true',
action='store_true', default=False,
help='Enable JSON in the output')
parser.add_argument('-S', '--summary', dest='summary_true',
action='store_true', default=False,
help='Enable summary output only')
parser.add_argument('-x', '--html', dest='html_true',
action='store_true', default=False,
help='Enable HTML file export')
parser.add_argument('-J', '--json-save', dest='json_save_true',
action='store_true', default=False,
help='Enable JSON export individually per host')
parser.add_argument('-a', '--analyze', dest='analyze',
default=False, action='store_true',
help='Enable SSL security analysis on the host')
parser.add_argument('-v', '--verbose', dest='verbose',
default=False, action='store_true',
help='Enable verbose to see what is going on')
parser.add_argument('-h', '--help', default=SUPPRESS,
action='help',
help='Show this help message and exit')
args = parser.parse_args()
# Get hosts from file if provided
if args.host_file:
with open(args.host_file) as f:
args.hosts = f.read().splitlines()
# Checks hosts list
if isinstance(args.hosts, list):
if len(args.hosts) == 0:
parser.print_help()
sys.exit(0)
return args
if __name__ == '__main__':
SSLCheckerObject = SSLChecker()
SSLCheckerObject.show_result(SSLCheckerObject.get_args(json_args={}))