Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

requirement, pypi: Add a --require-hashes flag #229

Merged
merged 11 commits into from
Feb 6, 2022
11 changes: 10 additions & 1 deletion pip_audit/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ def _parser() -> argparse.ArgumentParser:
action="store_true",
help="automatically upgrade dependencies with known vulnerabilities",
)
parser.add_argument(
"--require-hashes",
action="store_true",
help="require a hash to check each requirement against, for repeatable audits; this option "
"is implied when any package in a requirements file has a --hash option.",
tetsuo-cpp marked this conversation as resolved.
Show resolved Hide resolved
)
return parser


Expand Down Expand Up @@ -272,7 +278,10 @@ def audit() -> None:
if args.requirements is not None:
req_files: List[Path] = [Path(req.name) for req in args.requirements]
source = RequirementSource(
req_files, ResolveLibResolver(args.timeout, args.cache_dir, state), state
req_files,
ResolveLibResolver(args.timeout, args.cache_dir, state),
args.require_hashes,
state,
)
else:
source = PipSource(local=args.local, paths=args.paths, state=state)
Expand Down
42 changes: 40 additions & 2 deletions pip_audit/_dependency_source/requirement.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@

import logging
import os
import re
import shutil
from contextlib import ExitStack
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO, Iterator, List, Set, cast
from typing import IO, Iterator, List, Set, Union, cast

from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from pip_api import parse_requirements
from packaging.version import Version
from pip_api import ParsedRequirement, UnparsedRequirement, parse_requirements
from pip_api.exceptions import PipError

from pip_audit._dependency_source import (
Expand All @@ -29,6 +31,8 @@

logger = logging.getLogger(__name__)

PINNED_SPECIFIER_RE = re.compile(r"==(?P<version>.+?)$", re.VERBOSE)


class RequirementSource(DependencySource):
"""
Expand All @@ -39,6 +43,7 @@ def __init__(
self,
filenames: List[Path],
resolver: DependencyResolver,
require_hashes: bool = False,
state: AuditState = AuditState(),
) -> None:
"""
Expand All @@ -52,6 +57,7 @@ def __init__(
"""
self.filenames = filenames
self.resolver = resolver
self.require_hashes = require_hashes
self.state = state

def collect(self) -> Iterator[Dependency]:
Expand All @@ -67,6 +73,12 @@ def collect(self) -> Iterator[Dependency]:
except PipError as pe:
raise RequirementSourceError("requirement parsing raised an error") from pe

# If we're requiring hashes, we skip dependency resolution and check that each
# requirement is accompanied by a hash and is pinned
if self.require_hashes:
yield from self._collect_hashed_deps(iter(reqs.values()))
continue

# Invoke the dependency resolver to turn requirements into dependencies
req_values: List[Requirement] = [Requirement(str(req)) for req in reqs.values()]
try:
Expand Down Expand Up @@ -153,6 +165,32 @@ def _recover_files(self, tmp_files: List[IO[str]]) -> None:
logger.warning(f"encountered an exception during file recovery: {e}")
continue

def _collect_hashed_deps(
self, reqs: Iterator[Union[ParsedRequirement, UnparsedRequirement]]
) -> Iterator[Dependency]:
for req in reqs:
req = cast(ParsedRequirement, req)
if req.hash is None:
skip_reason = (
f"requirement {req.name} does not contain a hash with "
f"`--require-hashes`: {str(req)}"
)
logger.debug(skip_reason)
yield SkippedDependency(req.name, skip_reason)
continue
if req.specifier is not None:
pinned_specifier_info = PINNED_SPECIFIER_RE.match(str(req.specifier))
if pinned_specifier_info is not None:
# Yield a dependency with the hash
pinned_version = pinned_specifier_info.group("version")
yield ResolvedDependency(req.name, Version(pinned_version), req.hash)
continue
skip_reason = (
f"requirement {req.name} is not pinned with `--require-hashes`: {str(req)}"
)
logger.debug(skip_reason)
yield SkippedDependency(req.name, skip_reason)


class RequirementSourceError(DependencySourceError):
"""A requirements-parsing specific `DependencySourceError`."""
Expand Down
3 changes: 2 additions & 1 deletion pip_audit/_service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterator, List, Tuple
from typing import Iterator, List, Optional, Tuple

from packaging.utils import canonicalize_name
from packaging.version import Version
Expand Down Expand Up @@ -54,6 +54,7 @@ class ResolvedDependency(Dependency):
"""

version: Version
hash: Optional[str] = None
tetsuo-cpp marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True)
Expand Down
25 changes: 25 additions & 0 deletions pip_audit/_service/pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ def query(self, spec: Dependency) -> Tuple[Dependency, List[VulnerabilityResult]
response_json = response.json()
results: List[VulnerabilityResult] = []

# If the dependency has a hash explicitly listed, check it against the PyPI data
if spec.hash is not None:
releases = response_json["releases"]
release = releases.get(str(spec.version))
if release is None:
raise ServiceError(
"Could not find release to compare hashes: "
f"{spec.canonical_name} ({spec.version})"
)
found = False
hash_type, hash_value = spec.hash.split(":", 1)
for dist in release:
digests = dist.get("digests")
if digests is None:
continue
pypi_hash = digests.get(hash_type)
if pypi_hash is not None and pypi_hash == hash_value:
found = True
break
if not found:
raise ServiceError(
f"Mismatched hash for {spec.canonical_name} ({spec.version}): listed "
f"{hash_value} of type {hash_type} could not be found in PyPI releases"
)

vulns = response_json.get("vulnerabilities")

# No `vulnerabilities` key means that there are no vulnerabilities for any version
Expand Down