diff --git a/cli_isolated/main.py b/cli_isolated/main.py index f01dfa0b..fb6a3001 100644 --- a/cli_isolated/main.py +++ b/cli_isolated/main.py @@ -2,12 +2,17 @@ from safety_schemas.models import FileType from read_dependency_files import read_dependency_files +from vulnerability_checker import check_vulnerabilities +from rich.console import Console +console = Console() file_path = Path("/Users/dylanpulver/Repos/pyup/safety/test_requirements.txt") file_type = FileType.REQUIREMENTS_TXT # Call the function and consume the generator +# Process and check vulnerabilities for path, inspectable_file in read_dependency_files([file_path], [file_type]): - print(f"Processed file: {path}") - print(f"Inspectable File: {inspectable_file}") + console.print(f"Processed file: {path}") + file_model = check_vulnerabilities(path, inspectable_file, console) + console.print(f"File Model: {file_model}") diff --git a/cli_isolated/vulnerability_checker.py b/cli_isolated/vulnerability_checker.py new file mode 100644 index 00000000..dc026ee6 --- /dev/null +++ b/cli_isolated/vulnerability_checker.py @@ -0,0 +1,67 @@ +from typing import Generator, Tuple +from pathlib import Path +from safety_schemas.models import FileModel, Vulnerability, VulnerabilitySeverityLabels +from rich.console import Console + +# Sort vulnerabilities by severity +def sort_vulns_by_score(vuln: Vulnerability) -> int: + if vuln.severity and vuln.severity.cvssv3: + return vuln.severity.cvssv3.get("base_score", 0) + return 0 + +def check_vulnerabilities( + file_path: Path, + inspectable_file, + console: Console +) -> FileModel: + """ + Checks for vulnerabilities in the given InspectableFile and prepares a FileModel. + + Args: + file_path (Path): The path to the processed file. + inspectable_file: The processed InspectableFile. + console (Console): The console for logging. + + Returns: + FileModel: A structured model with vulnerabilities and results. + """ + dependency_results = inspectable_file.dependency_results + affected_specifications = dependency_results.get_affected_specifications() + + # Sort vulnerabilities by severity + def sort_vulns_by_score(vuln: Vulnerability) -> int: + if vuln.severity and vuln.severity.cvssv3: + return vuln.severity.cvssv3.get("base_score", 0) + return 0 + + if affected_specifications: + console.print(f"Vulnerabilities found in {file_path}:") + + for spec in affected_specifications: + vulns_to_report = sorted( + [v for v in spec.vulnerabilities if not v.ignored], + key=sort_vulns_by_score, + reverse=True, + ) + critical_vulns = sum( + 1 + for v in vulns_to_report + if v.severity + and v.severity.cvssv3 + and v.severity.cvssv3.get("base_severity", "").lower() + == VulnerabilitySeverityLabels.CRITICAL.value.lower() + ) + console.print(f"- {spec.name}: {len(vulns_to_report)} vulnerabilities") + if critical_vulns: + console.print(f" Critical: {critical_vulns}") + + else: + console.print(f"No vulnerabilities found in {file_path}.") + + # Prepare and return a FileModel + return FileModel( + location=file_path, + file_type=inspectable_file.file_type, + results=dependency_results, + ) +