Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rockylinux advisories #1535

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
add-rockylinux-advisories and tests
Signed-off-by: ambuj <kulshreshthaak.12@gmail.com>
ambuj-1211 committed Aug 5, 2024
commit 2e0939fc8e7a69e4bd8b1ca8a08dc303229578f1
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
from vulnerabilities.importers import pysec
from vulnerabilities.importers import redhat
from vulnerabilities.importers import retiredotnet
from vulnerabilities.importers import rockylinux
from vulnerabilities.importers import ruby
from vulnerabilities.importers import suse_scores
from vulnerabilities.importers import ubuntu
@@ -71,6 +72,7 @@
oss_fuzz.OSSFuzzImporter,
ruby.RubyImporter,
github_osv.GithubOSVImporter,
rockylinux.RockyLinuxImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
185 changes: 111 additions & 74 deletions vulnerabilities/importers/rockylinux.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,9 @@
from typing import Iterable
from typing import List

import dateparser
import requests
from cwe2.database import Database
from packageurl import PackageURL
from univers.version_range import RpmVersionRange

@@ -35,52 +37,54 @@


def fetch_cves() -> Iterable[List[Dict]]:
page_no = 1
cve_data = None
page_no = 0
cve_data_list = []
while True:
current_url = f"https://access.redhat.com/hydra/rest/securitydata/cve.json?per_page=1000&page={page_no}" # nopep8
current_url = f"https://errata.rockylinux.org/api/v2/advisories?filters.product=&filters.fetchRelated=true&page={page_no}&limit=100"
try:
response = requests_session.get(current_url)
if response.status_code != requests.codes.ok:
logger.error(f"Failed to fetch RedHat CVE results from {current_url}")
break
cve_data = response.json()
cve_data = response.json().get("advisories") or []
cve_data_list.extend(cve_data)
except Exception as e:
logger.error(f"Failed to fetch RedHat CVE results from {current_url} {e}")
logger.error(f"Failed to fetch rockylinux CVE results from {current_url} {e}")
break
if not cve_data:
break
page_no += 1
yield cve_data
return cve_data_list


def get_data_from_url(url):
try:
return requests_session.get(url).json()
except Exception as e:
logger.error(f"Failed to fetch results from {url} {e!r}")
return {}


class RedhatImporter(Importer):
class RockyLinuxImporter(Importer):
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved
spdx_license_expression = "CC-BY-4.0"
license_url = "https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/legal-notice"
importer_name = "RedHat Importer"
license_url = "https://access.redhat.com/security/data"
importer_name = "Rocky Importer"

def advisory_data(self) -> Iterable[AdvisoryData]:
for redhat_cves in fetch_cves():
for redhat_cve in redhat_cves:
yield to_advisory(redhat_cve)

for rockylinux_cve in fetch_cves():
yield to_advisory(rockylinux_cve)


def to_advisory(advisory_data):
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved
affected_packages: List[AffectedPackage] = []
for rpm in advisory_data.get("affected_packages") or []:
purl = rpm_to_purl(rpm_string=rpm, namespace="redhat")
aliases = advisory_data.get("name") or ""
date_published = dateparser.parse(advisory_data.get("publishedAt", ""))
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved

summary = advisory_data.get("description") or ""
affected_products = advisory_data.get("affectedProducts") or []
affected_packages = []
for products in affected_products:
packages = advisory_data["rpms"][products]["nvras"]
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved
affected_packages.extend(packages)
processed_affected_packages: List[AffectedPackage] = []
for rpm in affected_packages:
purl = rpm_to_purl(rpm_string=rpm.rsplit(".rpm", 1)[0] or "", namespace="rocky-linux")
if purl:
try:
affected_version_range = RpmVersionRange.from_versions(sequence=[purl.version])
affected_packages.append(
processed_affected_packages.append(
AffectedPackage(
package=PackageURL(
type=purl.type,
@@ -96,67 +100,100 @@ def to_advisory(advisory_data):
except Exception as e:
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved
logger.error(f"Failed to parse version range {purl.version} for {purl} {e}")

references = []
bugzilla = advisory_data.get("bugzilla")
if bugzilla:
url = "https://bugzilla.redhat.com/show_bug.cgi?id={}".format(bugzilla)
references.append(
Reference(
url=url,
reference_id=bugzilla,
)
references = [
Reference(
severities=[], url=fix.get("sourceLink") or "", reference_id=fix.get("ticket") or ""
)
for fix in advisory_data["fixes"]
]

for rh_adv in advisory_data.get("advisories") or []:
# RH provides 3 types of advisories RHSA, RHBA, RHEA. Only RHSA's contain severity score.
# See https://access.redhat.com/articles/2130961 for more details.
for ref in advisory_data.get("cves") or []:

if not isinstance(rh_adv, str):
logger.error(f"Invalid advisory type {rh_adv}")
name = ref.get("name", "")
if not isinstance(name, str):
logger.error(f"Invalid advisory type {name}")
continue

if "RHSA" in rh_adv.upper():
if "CVE" in name.upper():
severity_vector_pattern = r"CVSS:3\.1/([A-Z:/]+)"
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved
severities = VulnerabilitySeverity(
system=severity_systems.CVSSV31,
value=ref.get("cvss3BaseScore", ""),
scoring_elements=re.findall(
severity_vector_pattern, ref.get("cvss3ScoringVector", "")
),
)
references.append(
Reference(
url="https://access.redhat.com/errata/{}".format(rh_adv),
reference_id=rh_adv,
severities=[severities],
url=ref.get("sourceLink", ""),
reference_id=name,
)
)

else:
references.append(Reference(severities=[], url=url, reference_id=rh_adv))

redhat_scores = []
cvssv3_score = advisory_data.get("cvss3_score")
cvssv3_vector = advisory_data.get("cvss3_scoring_vector", "")
if cvssv3_score:
redhat_scores.append(
VulnerabilitySeverity(
system=severity_systems.CVSSV3,
value=cvssv3_score,
scoring_elements=cvssv3_vector,
)
)
cwe_list = []
# cwe_string : CWE-409","CWE-121->CWE-787","(CWE-401|CWE-404)","(CWE-190|CWE-911)->CWE-416"
cwe_string = advisory_data.get("CWE")
if cwe_string:
cwe_list = list(map(get_cwe_id, re.findall("CWE-[0-9]+", cwe_string)))

aliases = []
alias = advisory_data.get("CVE")
if alias:
aliases.append(alias)
resource_url = advisory_data.get("resource_url")
if resource_url:
references.append(Reference(severities=redhat_scores, url=resource_url))
return AdvisoryData(
aliases=aliases,
summary=advisory_data.get("bugzilla_description") or "",
affected_packages=affected_packages,
summary=summary,
affected_packages=processed_affected_packages,
references=references,
weaknesses=cwe_list,
url=resource_url
if resource_url
else "https://access.redhat.com/hydra/rest/securitydata/cve.json",
date_published=date_published,
weaknesses=get_cwes_from_rockylinux_advisory(advisory_data),
url=f"https://errata.rockylinux.org/{aliases}",
)


def get_cwes_from_rockylinux_advisory(advisory_data) -> [int]:
"""
Extract CWE IDs from advisory data and validate them against a database.

:param advisory_data: Dictionary containing CVE information.
:return: List of valid CWE IDs.

>>> advisory_data = {"cves": [
... {
... "name": "CVE-2022-24999",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24999",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-1321"
... },
... {
... "name": "CVE-2022-3517",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-3517",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-400"
... },
... {
... "name": "CVE-2022-43548",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-43548",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:N",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-350"
... }
... ]}
>>> get_cwes_from_rockylinux_advisory(advisory_data)
[1321, 400, 350]
>>> get_cwes_from_rockylinux_advisory({"cves": [{"name": "CVE-1234-1234","cwe": "None"}]})
[]
"""

cwe_ids = []
for cve in advisory_data.get("cves", []):
cwe_pattern = r"CWE-\d+"
cwe_id_list = re.findall(cwe_pattern, cve.get("cwe", ""))
cwe_ids.extend(cwe_id_list)
weaknesses = []
db = Database()
for cwe_string in cwe_ids:
if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
return weaknesses
1 change: 1 addition & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
valid_versions.RubyImprover,
valid_versions.GithubOSVImprover,
vulnerability_status.VulnerabilityStatusImprover,
valid_versions.RockyLinuxImprover,
]

IMPROVERS_REGISTRY = {x.qualified_name: x for x in IMPROVERS_REGISTRY}
6 changes: 6 additions & 0 deletions vulnerabilities/improvers/valid_versions.py
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
from vulnerabilities.importers.nginx import NginxImporter
from vulnerabilities.importers.npm import NpmImporter
from vulnerabilities.importers.oss_fuzz import OSSFuzzImporter
from vulnerabilities.importers.rockylinux import RockyLinuxImporter
from vulnerabilities.importers.ruby import RubyImporter
from vulnerabilities.importers.ubuntu import UbuntuImporter
from vulnerabilities.improver import MAX_CONFIDENCE
@@ -472,3 +473,8 @@ class RubyImprover(ValidVersionImprover):
class GithubOSVImprover(ValidVersionImprover):
importer = GithubOSVImporter
ignorable_versions = []


class RockyLinuxImprover(ValidVersionImprover):
importer = RockyLinuxImporter
ignorable_versions = []
Loading