Skip to content

Commit

Permalink
Merge branch 'main' into new-signature-nucleitemplates_wix-takeover.yml
Browse files Browse the repository at this point in the history
  • Loading branch information
liquidsec authored Jan 18, 2024
2 parents 8157e62 + 58c0e75 commit f5dfa21
Show file tree
Hide file tree
Showing 86 changed files with 18,989 additions and 599 deletions.
23 changes: 19 additions & 4 deletions .github/workflows/read-sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: Read Sources

on:
workflow_dispatch:
schedule:
- cron: '0 17 * * *'
jobs:
readsources:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -84,9 +86,14 @@ jobs:
sleep 5
PR_TITLE="[SignatureBot] Add or update signature $file"
EXISTING_OPEN_PRS=$(curl -H "Authorization: token $GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/${{ github.repository }}/pulls?state=open" | jq --arg PR_TITLE "$PR_TITLE" '.[] | select(.title == $PR_TITLE)')
SEARCH_QUERY="repo:${{ github.repository }} type:pr state:open in:title \"$PR_TITLE\""
if [[ -n "$EXISTING_OPEN_PRS" ]]; then
# Encode the search query
ENCODED_QUERY=$(echo "$SEARCH_QUERY" | jq -sRr @uri)
EXISTING_OPEN_PRS=$(curl -H "Authorization: token $GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/search/issues?q=$ENCODED_QUERY" | jq '.items')
if [[ -n "$EXISTING_OPEN_PRS" && "$EXISTING_OPEN_PRS" != "[]" ]]; then
echo "Found an open PR with title '$PR_TITLE', skipping this signature" >> readsources_action.log
continue
fi
Expand Down Expand Up @@ -122,10 +129,18 @@ jobs:
if git ls-remote --heads origin $BRANCH_NAME | grep $BRANCH_NAME; then
# Check for existing PRs with the same title
PR_TITLE="[SignatureBot] Add or update signature $file"
# Construct the search query
SEARCH_QUERY="repo:${{ github.repository }} type:pr state:open in:title \"$PR_TITLE\""
# Encode the search query for the URL
ENCODED_QUERY=$(echo "$SEARCH_QUERY" | jq -sRr @uri)
# Short delay, then check for existing PRs with the Search API
sleep 5
EXISTING_OPEN_PRS=$(curl -H "Authorization: token $GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/${{ github.repository }}/pulls?state=open" | jq --arg PR_TITLE "$PR_TITLE" '.[] | select(.title == $PR_TITLE)')
EXISTING_OPEN_PRS=$(curl -H "Authorization: token $GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/search/issues?q=$ENCODED_QUERY" | jq '.items')
if [[ -n "$EXISTING_OPEN_PRS" ]]; then
if [[ -n "$EXISTING_OPEN_PRS" && "$EXISTING_OPEN_PRS" != "[]" ]]; then
echo "Found an open PR with title '$PR_TITLE', skipping branch deletion for safety" >> readsources_action.log
continue
fi
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Check subdomains for for subdomain takeovers and other DNS tomfoolery

[![Black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
![License](https://img.shields.io/badge/license-GPLv3-f126ea.svg)
[![tests](https://github.com/blacklanternsecurity/baddns/actions/workflows/tests.yaml/badge.svg)](https://github.com/blacklanternsecurity/baddns/actions/workflows/tests.yaml)

<p align="left"><img width="300" height="300" src="https://github.com/blacklanternsecurity/baddns/assets/24899338/2ca1fe25-e834-4df8-8b02-8bf8f60f6e31"></p>

20 changes: 19 additions & 1 deletion baddns/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
from .lib.baddns import BadDNS_cname
import os
import importlib
from pathlib import Path
from .base import BadDNS_base

module_dir = Path(__file__).parent / "modules"
module_files = list(os.listdir(module_dir))
modules_loaded = {}
for file in module_files:
file = module_dir / file
if file.is_file() and file.suffix.lower() == ".py" and file.stem not in ["base", "__init__"]:
modules = importlib.import_module(f"baddns.modules.{file.stem}")
for m in modules.__dict__.keys():
module = getattr(modules, m)
try:
if BadDNS_base in module.__bases__:
modules_loaded[file.stem] = module
except AttributeError:
continue
68 changes: 68 additions & 0 deletions baddns/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import yaml
import logging
import pkg_resources

log = logging.getLogger(__name__)

from .lib.signature import BadDNSSignature
from .lib.errors import BadDNSSignatureException


class BadDNS_base:
def __init__(
self,
target,
http_client_class=None,
dns_client=None,
signatures_dir=None,
custom_nameservers=None,
cli=False,
**kwargs,
):
self.http_client_class = http_client_class
self.dns_client = dns_client
self.target = target
self.signatures_dir = signatures_dir
self.signatures = []
self.load_signatures(signatures_dir)
self.custom_nameservers = custom_nameservers
self.parent_class = kwargs.get("parent_class", "self")
self.cli = cli

def infomsg(self, msg):
if self.cli:
log.info(msg)
else:
log.debug(msg)

def load_signatures(self, signatures_dir=None):
if signatures_dir:
if not os.path.exists(signatures_dir):
raise BadDNSSignatureException(f"Signatures directory [{signatures_dir}] does not exist")
else:
signatures_dir = pkg_resources.resource_filename("baddns", "signatures")

log.debug(f"attempting to load signatures from: {signatures_dir}")

for filename in os.listdir(signatures_dir):
if filename.endswith(".yml"):
file_path = os.path.join(signatures_dir, filename)

# Open each file and load the YAML contents
try:
with open(file_path, "r") as file:
signature_data = yaml.safe_load(file)
signature = BadDNSSignature()
signature.initialize(**signature_data)
self.signatures.append(signature)
except BadDNSSignatureException as e:
log.error(f"Error loading signature from {filename}: {e}")
if len(self.signatures) == 0:
raise BadDNSSignatureException(f"No signatures were successfuly loaded from [{signatures_dir}]")
else:
log.debug(f"Loaded [{str(len(self.signatures))}] signatures from [{signatures_dir}]")


def get_all_modules(*args, **kwargs):
return [m for m in BadDNS_base.__subclasses__()]
165 changes: 116 additions & 49 deletions baddns/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,23 @@
import logging
import pkg_resources

from .lib.baddns import BadDNS_cname
from .lib.errors import BadDNSSignatureException
from .lib.errors import BadDNSSignatureException, BadDNSCLIException
from .lib.logging import setup_logging

from baddns.base import get_all_modules

from colorama import Fore, Style, init

init(autoreset=True) # Automatically reset the color to default after each print statement

log = None


class CustomLogFormatter(logging.Formatter):
FORMATS = {
logging.DEBUG: Fore.MAGENTA + "[%(levelname)s] %(message)s" + Style.RESET_ALL,
logging.INFO: Fore.CYAN + "%(message)s" + Style.RESET_ALL,
logging.WARNING: Fore.YELLOW + "[%(levelname)s] %(message)s" + Style.RESET_ALL,
logging.ERROR: Fore.RED + "[%(levelname)s] %(message)s" + Style.RESET_ALL,
logging.CRITICAL: Fore.RED + Style.BRIGHT + "[%(levelname)s] - %(message)s" + Style.RESET_ALL,
}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


def setup_logging():
global log
log = logging.getLogger()
logging.getLogger("httpx").setLevel(logging.WARNING)
log.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(CustomLogFormatter())
log.addHandler(ch)


def debug_logging(debug=False):
log = logging.getLogger()
if debug:
log.setLevel(logging.DEBUG)
modules = get_all_modules()


class CustomArgumentParser(argparse.ArgumentParser):
def error(self, message):
self.print_usage()
log.error(message)
self.exit(1)
raise BadDNSCLIException(message)


def print_version():
Expand All @@ -66,45 +37,138 @@ def print_version():


def validate_target(
arg_value, pattern=re.compile(r"^(?:[a-z0-9](?:[a-z0-9-_]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$")
arg_value, pattern=re.compile(r"^(?:[a-z0-9_](?:[a-z0-9-_]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$")
):
if not pattern.match(arg_value):
raise argparse.ArgumentTypeError("Target subdomain is not correctly formatted")
return arg_value


def validate_nameservers(
arg_value,
pattern=re.compile(
r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(,((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))*$"
),
):
if not pattern.match(arg_value):
raise argparse.ArgumentTypeError("Nameservers argument is incorrectly formatted")
return arg_value


def validate_modules(arg_value, pattern=re.compile(r"^[a-zA-Z0-9_]+(,[a-zA-Z0-9_]+)*$")):
if not pattern.match(arg_value):
raise argparse.ArgumentTypeError(
"The format of provided modules is incorrect. Use comma-separated values without spaces."
)

modules_provided = [m.upper() for m in arg_value.split(",")]
for m in modules_provided:
if not any(m in module.name.upper() for module in modules):
raise argparse.ArgumentTypeError(
f"'{m}' is not a recognized module. Please check the module name or use '-l' to list available modules."
)
return arg_value


async def execute_module(ModuleClass, target, custom_nameservers, signatures_dir):
findings = None
try:
module_instance = ModuleClass(
target, custom_nameservers=custom_nameservers, signatures_dir=signatures_dir, cli=True
)
except BadDNSSignatureException as e:
log.error(f"Error loading signatures: {e}")
raise BadDNSCLIException(f"Error loading signatures: {e}")

log.info(f"Starting [{module_instance.name}] module with target [{target}]")
if await module_instance.dispatch():
findings = module_instance.analyze()
if findings:
print(f"{Fore.GREEN}{'Vulnerable!'}{Style.RESET_ALL}")
for finding in findings:
print(finding.to_dict())
return findings


async def _main():
setup_logging()
global log
log = logging.getLogger("baddns")

parser = CustomArgumentParser(description="Check subdomains for subdomain takeovers and other DNS tomfoolery")
print(f"{Fore.GREEN}{ascii_art_banner}{Style.RESET_ALL}")
print_version()

parser.add_argument("target", type=validate_target, help="subdomain to analyze")
parser.add_argument("-d", "--debug", action="store_true", help="Enable debug logging")
parser.add_argument(
"-n",
"--custom-nameservers",
type=validate_nameservers,
help="Provide a list of custom nameservers separated by comma.",
)

parser.add_argument(
"-c",
"--custom-signatures",
help="Use an alternate directory for loadings signatures",
)

parser.add_argument(
"-l", "--list-modules", action="store_true", help="List available modules and their descriptions."
)

parser.add_argument(
"-m",
"--modules",
type=validate_modules,
help="Comma separated list of module names to use. Ex: module1,module2,module3",
)

parser.add_argument("-d", "--debug", action="store_true", help="Enable debug logging")

parser.add_argument("target", nargs="?", type=validate_target, help="subdomain to analyze")
args = parser.parse_args()
debug_logging(args.debug)

if not args.target and not args.list_modules:
parser.error("the following arguments are required: target")

if args.list_modules:
r = get_all_modules()
print("Available Modules:")
for m in r:
log.info(f"[{m.name}] - {m.description}")
sys.exit(0)

if args.debug:
log.setLevel(logging.DEBUG)

# Get all available modules
all_modules = get_all_modules()

# If the user provided the -m or --modules argument, filter the modules accordingly
if args.modules:
included_module_names = [name.strip().upper() for name in args.modules.split(",")]
modules_to_execute = [module for module in all_modules if module.name.upper() in included_module_names]
else:
modules_to_execute = all_modules # Default to all modules if -m is not provided
log.info(
f"Running with all modules [{', '.join([module.name for module in modules_to_execute])}] (-m to specify)"
)

if args.custom_signatures:
log.info(f"Using custom signatures directory: [{args.custom_signatures}]")

try:
baddns_cname = BadDNS_cname(args.target, signatures_dir=args.custom_signatures)
except BadDNSSignatureException as e:
log.error(f"Error loading signatures: {e}")
sys.exit(1)
custom_nameservers = None
if args.custom_nameservers:
custom_nameservers = args.custom_nameservers.split(",")
log.info(f"Using custom nameservers: [{', '.join(custom_nameservers)}]")

for ModuleClass in modules_to_execute:
await execute_module(ModuleClass, args.target, custom_nameservers, args.custom_signatures)

if await baddns_cname.dispatch():
finding = baddns_cname.analyze()
if finding:
print(f"{Fore.GREEN}{'Vulnerable!'}{Style.RESET_ALL}")
print(finding)

# BadDNS_base.get_all_modules()
# await execute_module(BadDNS_cname, args.target, custom_nameservers, args.custom_signatures)
# await execute_module(BadDNS_ns, args.target, custom_nameservers, args.custom_signatures)


def main():
Expand All @@ -113,6 +177,9 @@ def main():
except asyncio.CancelledError:
log.error("Got asyncio.CancelledError")

except BadDNSCLIException:
sys.exit(1)

except KeyboardInterrupt:
sys.exit(1)

Expand Down
Loading

0 comments on commit f5dfa21

Please sign in to comment.