Skip to content

Commit

Permalink
pip_audit: Add skipped dependencies to audit summary (#145)
Browse files Browse the repository at this point in the history
* pip_audit: Add skipped dependencies to audit summary

* doc: Add comment to explain the MyPy directive

* test: Fix type hints

* pip_audit, test: subclass enforcement for Dependency

* pip_audit: prefer cast over isinstance

We enforce a strict type hierarchy for Dependency, so these
casts should always be sound.

Co-authored-by: William Woodruff <[email protected]>
  • Loading branch information
tetsuo-cpp and woodruffw authored Nov 30, 2021
1 parent d1fddb7 commit bfe393e
Show file tree
Hide file tree
Showing 22 changed files with 358 additions and 104 deletions.
10 changes: 8 additions & 2 deletions pip_audit/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
from contextlib import ExitStack
from pathlib import Path
from typing import List, Optional
from typing import List, Optional, cast

from pip_audit import __version__
from pip_audit._audit import AuditOptions, Auditor
Expand All @@ -21,6 +21,7 @@
)
from pip_audit._format import ColumnsFormat, CycloneDxFormat, JsonFormat, VulnerabilityFormat
from pip_audit._service import OsvService, PyPIService, VulnerabilityService
from pip_audit._service.interface import ResolvedDependency, SkippedDependency
from pip_audit._state import AuditSpinner
from pip_audit._util import assert_never

Expand Down Expand Up @@ -211,7 +212,12 @@ def audit() -> None:
vuln_count = 0
for (spec, vulns) in auditor.audit(source):
if state is not None:
state.update_state(f"Auditing {spec.name} ({spec.version})")
if spec.is_skipped():
spec = cast(SkippedDependency, spec)
state.update_state(f"Skipping {spec.name}: {spec.skip_reason}")
else:
spec = cast(ResolvedDependency, spec)
state.update_state(f"Auditing {spec.name} ({spec.version})")
result[spec] = vulns
if len(vulns) > 0:
pkg_count += 1
Expand Down
6 changes: 3 additions & 3 deletions pip_audit/_dependency_source/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from packaging.requirements import Requirement

from pip_audit._service import Dependency
from pip_audit._service import Dependency, ResolvedDependency


class DependencySource(ABC):
Expand Down Expand Up @@ -48,15 +48,15 @@ class DependencyResolver(ABC):
"""

@abstractmethod
def resolve(self, req: Requirement) -> List[Dependency]: # pragma: no cover
def resolve(self, req: Requirement) -> List[ResolvedDependency]: # pragma: no cover
"""
Resolve a single `Requirement` into a list of concrete `Dependency` instances.
"""
raise NotImplementedError

def resolve_all(
self, reqs: Iterator[Requirement]
) -> Iterator[Tuple[Requirement, List[Dependency]]]:
) -> Iterator[Tuple[Requirement, List[ResolvedDependency]]]:
"""
Resolve a collection of `Requirement`s into their respective `Dependency` sets.
Expand Down
13 changes: 8 additions & 5 deletions pip_audit/_dependency_source/pip.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from packaging.version import InvalidVersion, Version

from pip_audit._dependency_source import DependencySource, DependencySourceError
from pip_audit._service import Dependency
from pip_audit._service import Dependency, ResolvedDependency, SkippedDependency
from pip_audit._state import AuditState

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,18 +62,21 @@ def collect(self) -> Iterator[Dependency]:
# We collect them all into a single well-defined error.
try:
for (_, dist) in pip_api.installed_distributions(local=self._local).items():
dep: Dependency
try:
dep = Dependency(name=dist.name, version=Version(str(dist.version)))
dep = ResolvedDependency(name=dist.name, version=Version(str(dist.version)))
if self.state is not None:
self.state.update_state(
f"Collecting {dep.name} ({dep.version})"
) # pragma: no cover
yield dep
except InvalidVersion:
logger.warning(
"Warning: Package has invalid version and could not be audited: "
skip_reason = (
"Package has invalid version and could not be audited: "
f"{dist.name} ({dist.version})"
)
logger.warning(f"Warning: {skip_reason}")
dep = SkippedDependency(name=dist.name, skip_reason=skip_reason)
yield dep
except Exception as e:
raise PipSourceError("failed to list installed distributions") from e

Expand Down
8 changes: 4 additions & 4 deletions pip_audit/_dependency_source/resolvelib/resolvelib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from resolvelib import BaseReporter, Resolver

from pip_audit._dependency_source import DependencyResolver, DependencyResolverError
from pip_audit._service.interface import Dependency
from pip_audit._service.interface import ResolvedDependency
from pip_audit._state import AuditState

from .pypi_provider import PyPIProvider
Expand All @@ -32,17 +32,17 @@ def __init__(self, timeout: Optional[int] = None, state: Optional[AuditState] =
self.reporter = BaseReporter()
self.resolver: Resolver = Resolver(self.provider, self.reporter)

def resolve(self, req: Requirement) -> List[Dependency]:
def resolve(self, req: Requirement) -> List[ResolvedDependency]:
"""
Resolve the given `Requirement` into a `Dependency` list.
"""
deps: List[Dependency] = []
deps: List[ResolvedDependency] = []
try:
result = self.resolver.resolve([req])
except HTTPError as e:
raise ResolveLibResolverError("failed to resolve dependencies") from e
for name, candidate in result.mapping.items():
deps.append(Dependency(name, candidate.version))
deps.append(ResolvedDependency(name, candidate.version))
return deps


Expand Down
38 changes: 36 additions & 2 deletions pip_audit/_format/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from itertools import zip_longest
from typing import Any, Dict, Iterable, List, Tuple
from typing import Any, Dict, Iterable, List, Tuple, cast

from packaging.version import Version

Expand Down Expand Up @@ -52,6 +52,9 @@ def format(self, result: Dict[service.Dependency, List[service.VulnerabilityResu
header.append("Description")
vuln_data.append(header)
for dep, vulns in result.items():
if dep.is_skipped():
continue
dep = cast(service.ResolvedDependency, dep)
for vuln in vulns:
vuln_data.append(self._format_vuln(dep, vuln))

Expand All @@ -67,9 +70,34 @@ def format(self, result: Dict[service.Dependency, List[service.VulnerabilityResu
columns_string += "\n"
columns_string += row

# Now display the skipped dependencies
skip_data: List[List[Any]] = []
skip_header = ["Name", "Skip Reason"]

skip_data.append(skip_header)
for dep, _ in result.items():
if dep.is_skipped():
dep = cast(service.SkippedDependency, dep)
skip_data.append(self._format_skipped_dep(dep))

# If we only have the header, that means that we haven't skipped any dependencies
# In that case, don't bother printing the header
if len(skip_data) <= 1:
return columns_string

skip_strings, sizes = tabulate(skip_data)

# Create separator for skipped dependencies columns
skip_strings.insert(1, " ".join(map(lambda x: "-" * x, sizes)))

for row in skip_strings:
columns_string += "\n" + row

return columns_string

def _format_vuln(self, dep: service.Dependency, vuln: service.VulnerabilityResult) -> List[Any]:
def _format_vuln(
self, dep: service.ResolvedDependency, vuln: service.VulnerabilityResult
) -> List[Any]:
vuln_data = [
dep.canonical_name,
dep.version,
Expand All @@ -82,3 +110,9 @@ def _format_vuln(self, dep: service.Dependency, vuln: service.VulnerabilityResul

def _format_fix_versions(self, fix_versions: List[Version]) -> str:
return ",".join([str(version) for version in fix_versions])

def _format_skipped_dep(self, dep: service.SkippedDependency) -> List[Any]:
return [
dep.canonical_name,
dep.skip_reason,
]
8 changes: 7 additions & 1 deletion pip_audit/_format/cyclonedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import enum
from typing import Dict, List
from typing import Dict, List, cast

from cyclonedx import output
from cyclonedx.model.bom import Bom
Expand All @@ -21,6 +21,12 @@ def __init__(self, result: Dict[service.Dependency, List[service.VulnerabilityRe
super().__init__()

for (dep, vulns) in result.items():
# TODO(alex): Is there anything interesting we can do with skipped dependencies in
# the CycloneDX format?
if dep.is_skipped():
continue
dep = cast(service.ResolvedDependency, dep)

c = Component(name=dep.name, version=str(dep.version))
for vuln in vulns:
c.add_vulnerability(
Expand Down
10 changes: 9 additions & 1 deletion pip_audit/_format/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import json
from typing import Any, Dict, List
from typing import Any, Dict, List, cast

import pip_audit._service as service

Expand Down Expand Up @@ -40,6 +40,14 @@ def format(self, result: Dict[service.Dependency, List[service.VulnerabilityResu
def _format_dep(
self, dep: service.Dependency, vulns: List[service.VulnerabilityResult]
) -> Dict[str, Any]:
if dep.is_skipped():
dep = cast(service.SkippedDependency, dep)
return {
"name": dep.canonical_name,
"skip_reason": dep.skip_reason,
}

dep = cast(service.ResolvedDependency, dep)
return {
"name": dep.canonical_name,
"version": str(dep.version),
Expand Down
2 changes: 2 additions & 0 deletions pip_audit/_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from .interface import ( # noqa: F401
Dependency,
ResolvedDependency,
ServiceError,
SkippedDependency,
VulnerabilityResult,
VulnerabilityService,
)
Expand Down
41 changes: 37 additions & 4 deletions pip_audit/_service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
@dataclass(frozen=True)
class Dependency:
"""
Represents a fully resolved Python package.
Represents an abstract Python package.
This class cannot be constructed directly.
"""

name: str
Expand All @@ -23,7 +25,12 @@ class Dependency:
Use the `canonicalized_name` property when a canonicalized form is necessary.
"""
version: Version

def __init__(self, *_args, **_kwargs) -> None:
"""
A stub constructor that always fails.
"""
raise NotImplementedError

# TODO(ww): Use functools.cached_property when supported Python is 3.8+.
@property
Expand All @@ -33,6 +40,30 @@ def canonical_name(self) -> str:
"""
return canonicalize_name(self.name)

def is_skipped(self) -> bool:
"""
Check whether the `Dependency` was skipped by the audit.
"""
return self.__class__ is SkippedDependency


@dataclass(frozen=True)
class ResolvedDependency(Dependency):
"""
Represents a fully resolved Python package.
"""

version: Version


@dataclass(frozen=True)
class SkippedDependency(Dependency):
"""
Represents a Python package that was unable to be audited and therefore, skipped.
"""

skip_reason: str


@dataclass(frozen=True)
class VulnerabilityResult:
Expand Down Expand Up @@ -63,7 +94,9 @@ class VulnerabilityService(ABC):
"""

@abstractmethod
def query(self, spec: Dependency) -> List[VulnerabilityResult]: # pragma: no cover
def query(
self, spec: Dependency
) -> Tuple[Dependency, List[VulnerabilityResult]]: # pragma: no cover
"""
Query the `VulnerabilityService` for information about the given `Dependency`,
returning a list of `VulnerabilityResult`.
Expand All @@ -80,7 +113,7 @@ def query_all(
a more optimized one, if they support batched or bulk requests.
"""
for spec in specs:
yield (spec, self.query(spec))
yield self.query(spec)


class ServiceError(Exception):
Expand Down
19 changes: 14 additions & 5 deletions pip_audit/_service/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
"""

import json
from typing import List, Optional
from typing import List, Optional, Tuple, cast

import requests
from packaging.version import Version

from .interface import Dependency, ServiceError, VulnerabilityResult, VulnerabilityService
from .interface import (
Dependency,
ResolvedDependency,
ServiceError,
VulnerabilityResult,
VulnerabilityService,
)


class OsvService(VulnerabilityService):
Expand All @@ -26,12 +32,15 @@ def __init__(self, timeout: Optional[int] = None):
"""
self.timeout = timeout

def query(self, spec: Dependency) -> List[VulnerabilityResult]:
def query(self, spec: Dependency) -> Tuple[Dependency, List[VulnerabilityResult]]:
"""
Queries OSV for the given `Dependency` specification.
See `VulnerabilityService.query`.
"""
if spec.is_skipped():
return spec, []
spec = cast(ResolvedDependency, spec)

url = "https://api.osv.dev/v1/query"
query = {
Expand All @@ -58,7 +67,7 @@ def query(self, spec: Dependency) -> List[VulnerabilityResult]:
# In that case, return an empty list
response_json = response.json()
if not response_json:
return results
return spec, results

for vuln in response_json["vulns"]:
id = vuln["id"]
Expand Down Expand Up @@ -87,4 +96,4 @@ def query(self, spec: Dependency) -> List[VulnerabilityResult]:

results.append(VulnerabilityResult(id, description, fix_versions))

return results
return spec, results
Loading

0 comments on commit bfe393e

Please sign in to comment.