Skip to content

Commit

Permalink
Merge pull request #404 from blacklanternsecurity/dev
Browse files Browse the repository at this point in the history
Push new tests to main
  • Loading branch information
liquidsec authored Jan 18, 2024
2 parents 34acfab + 10058b4 commit 58c0e75
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 261 deletions.
3 changes: 2 additions & 1 deletion baddns/lib/whoismanager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import whois
import logging
import asyncio
import tldextract
from datetime import date, datetime, timedelta
from dateutil import parser as date_parser
Expand All @@ -17,7 +18,7 @@ async def dispatchWHOIS(self):
log.debug(f"Extracted base domain [{ext.registered_domain}] from [{self.target}]")
log.debug(f"Submitting WHOIS query for {ext.registered_domain}")
try:
w = whois.whois(ext.registered_domain)
w = await asyncio.to_thread(whois.whois, ext.registered_domain)
log.debug(f"Got response to whois request for {ext.registered_domain}")
self.whois_result = {"type": "response", "data": w}
except whois.parser.PywhoisError as e:
Expand Down
6 changes: 3 additions & 3 deletions baddns/modules/nsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def nsec_walk(self, domain):
next_domain = await self.get_nsec_record(current_domain)
if next_domain is None or next_domain[0] in self.nsec_chain:
break
log.debug(f"Found additiona NSEC record: {next_domain}")
log.debug(f"Found additional NSEC record: {next_domain}")
if not next_domain[0].startswith("\\"):
self.nsec_chain.append(next_domain[0])
current_domain = next_domain[0]
Expand All @@ -44,7 +44,7 @@ async def dispatch(self):

self.nsec_chain.append(self.target)
self.infomsg(f"NSEC Records detected, attempting NSEC walk against domain [{self.target}]")
await self.nsec_walk(self.target_dnsmanager.answers["NSEC"][0])
await self.nsec_walk(self.target)
return True

def analyze(self):
Expand All @@ -55,7 +55,7 @@ def analyze(self):
Finding(
{
"target": self.target_dnsmanager.target,
"description": f"DNSSEC NSEC Zone Walking Enabled for domain: {self.target}",
"description": f"DNSSEC NSEC Zone Walking Enabled for domain: [{self.target}]",
"confidence": "CONFIRMED",
"signature": "N/A",
"indicator": "NSEC Records",
Expand Down
1 change: 1 addition & 0 deletions baddns/modules/zonetransfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async def zone_transfer(self, nameserver, domain):
ns_ip = ns_ips[0]
try:
zone = await asyncio.to_thread(dns.zone.from_xfr, dns.query.xfr(ns_ip, domain, timeout=10))

except dns.exception.Timeout:
log.debug("TimeoutError attempting zone transfer")
return False
Expand Down
509 changes: 255 additions & 254 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ line-length = 119

[tool.poetry.scripts]
baddns = 'baddns.cli:main'

[tool.pytest.ini_options]
filterwarnings = [
"ignore::DeprecationWarning"
]
3 changes: 0 additions & 3 deletions tests/mx_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ async def test_mx_unregistered(fs, mock_dispatch_whois, configure_mock_resolver,
"trigger": "mail2.worse.dns",
"module": "MX",
}
print("!!!!!!!!SDFSDFDSF")

print(findings[0].to_dict())
assert any(expected == finding.to_dict() for finding in findings)


Expand Down
34 changes: 34 additions & 0 deletions tests/nsec_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from baddns.modules.nsec import BadDNS_nsec
from .helpers import mock_signature_load


@pytest.mark.asyncio
async def test_nsec_match(fs, mock_dispatch_whois, configure_mock_resolver):
mock_data = {
"bad.dns": {"NSEC": ["asdf.bad.dns"]},
"asdf.bad.dns": {"NSEC": ["zzzz.bad.dns"]},
"zzzz.bad.dns": {"NSEC": ["xyz.bad.dns"]},
}
mock_resolver = configure_mock_resolver(mock_data)
target = "bad.dns"
mock_signature_load(fs, "nucleitemplates_azure-takeover-detection.yml")

baddns_nsec = BadDNS_nsec(target, signatures_dir="/tmp/signatures", dns_client=mock_resolver)

findings = None
if await baddns_nsec.dispatch():
findings = baddns_nsec.analyze()

assert findings
expected = {
"target": "bad.dns",
"description": "DNSSEC NSEC Zone Walking Enabled for domain: [bad.dns]",
"confidence": "CONFIRMED",
"signature": "N/A",
"indicator": "NSEC Records",
"trigger": "bad.dns",
"module": "NSEC",
"found_domains": ["bad.dns", "asdf.bad.dns", "zzzz.bad.dns", "xyz.bad.dns"],
}
assert any(expected == finding.to_dict() for finding in findings)
29 changes: 29 additions & 0 deletions tests/txt_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
from baddns.modules.txt import BadDNS_txt
from .helpers import mock_signature_load


@pytest.mark.asyncio
async def test_txt_match(fs, mock_dispatch_whois, configure_mock_resolver):
mock_data = {"bad.dns": {"TXT": ["baddns.azurewebsites.net"]}, "_NXDOMAIN": ["baddns.azurewebsites.net"]}
mock_resolver = configure_mock_resolver(mock_data)
target = "bad.dns"
mock_signature_load(fs, "nucleitemplates_azure-takeover-detection.yml")

baddns_txt = BadDNS_txt(target, signatures_dir="/tmp/signatures", dns_client=mock_resolver)

findings = None
if await baddns_txt.dispatch():
findings = baddns_txt.analyze()

assert findings
expected = {
"target": "bad.dns",
"description": "Vulnerable Host in TXT Record. Original Event: [Dangling CNAME, probable subdomain takeover (NXDOMAIN technique)]",
"confidence": "PROBABLE",
"signature": "Microsoft Azure Takeover Detection",
"indicator": "azurewebsites.net",
"trigger": "bad.dns",
"module": "TXT",
}
assert any(expected == finding.to_dict() for finding in findings)
51 changes: 51 additions & 0 deletions tests/zonetransfer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pytest
import dns


from baddns.modules.zonetransfer import BadDNS_zonetransfer
from .helpers import mock_signature_load


def from_xfr(*args, **kwargs):
zone_text = """
@ 600 IN SOA ns.bad.dns. admin.bad.dns. (
1 ; Serial
3600 ; Refresh
900 ; Retry
604800 ; Expire
86400 ) ; Minimum TTL
@ 600 IN NS ns.bad.dns.
@ 600 IN A 127.0.0.1
asdf 600 IN A 127.0.0.1
zzzz 600 IN AAAA dead::beef
"""
zone = dns.zone.from_text(zone_text, origin="blacklanternsecurity.fakedomain.")
return zone


@pytest.mark.asyncio
async def test_zonetransfer_discovery(fs, configure_mock_resolver, monkeypatch):
mock_signature_load(fs, "nucleitemplates_azure-takeover-detection.yml")
mock_data = {"bad.dns": {"NS": ["ns1.bad.dns."]}, "ns1.bad.dns": {"A": ["127.0.0.1"]}}
mock_resolver = configure_mock_resolver(mock_data)
target = "bad.dns"
baddns_zonetransfer = BadDNS_zonetransfer(target, signatures_dir="/tmp/signatures", dns_client=mock_resolver)

monkeypatch.setattr("dns.zone.from_xfr", from_xfr)

findings = None
if await baddns_zonetransfer.dispatch():
findings = baddns_zonetransfer.analyze()

assert findings
expected = {
"target": "bad.dns",
"description": "Successful Zone Transfer",
"confidence": "CONFIRMED",
"signature": "N/A",
"indicator": "Successful XFR Request",
"trigger": "ns1.bad.dns",
"module": "zonetransfer",
"found_domains": ["bad.dns", "asdf.bad.dns", "zzzz.bad.dns"],
}
assert any(expected == finding.to_dict() for finding in findings)

0 comments on commit 58c0e75

Please sign in to comment.