Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

requirement: Implement --fix for RequirementSource #225

Merged
merged 16 commits into from
Jan 24, 2022
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions pip_audit/_dependency_source/requirement.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
Collect dependencies from one or more `requirements.txt`-formatted files.
"""

import logging
import shutil
from contextlib import ExitStack
from pathlib import Path
from typing import Iterator, List, Set, cast
from tempfile import TemporaryFile
from typing import IO, Iterator, List, Set, cast

from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from pip_api import parse_requirements
from pip_api.exceptions import PipError

from pip_audit._dependency_source import (
DependencyFixError,
DependencyResolver,
DependencyResolverError,
DependencySource,
Expand All @@ -20,6 +26,8 @@
from pip_audit._service.interface import ResolvedDependency, SkippedDependency
from pip_audit._state import AuditState

logger = logging.getLogger(__name__)


class RequirementSource(DependencySource):
"""
Expand Down Expand Up @@ -79,14 +87,77 @@ def collect(self) -> Iterator[Dependency]:
except DependencyResolverError as dre:
raise RequirementSourceError("dependency resolver raised an error") from dre

def fix(self, fix_version: ResolvedFixVersion) -> None: # pragma: no cover
def fix(self, fix_version: ResolvedFixVersion) -> None:
"""
Fixes a dependency version for this `RequirementSource`.
"""
raise NotImplementedError
with ExitStack() as stack:
# Make temporary copies of the existing requirements files. If anything goes wrong, we
# want to copy them back into place and undo any partial application of the fix.
tmp_files: List[IO[str]] = [
stack.enter_context(TemporaryFile(mode="w")) for _ in self.filenames
]
for (filename, tmp_file) in zip(self.filenames, tmp_files):
with open(filename, "r") as f:
tetsuo-cpp marked this conversation as resolved.
Show resolved Hide resolved
shutil.copyfileobj(f, tmp_file)

try:
# Now fix the files inplace
for filename in self.filenames:
self.state.update_state(
f"Fixing dependency {fix_version.dep.name} ({fix_version.dep.version} => "
f"{fix_version.version})"
)
self._fix_file(filename, fix_version)
except Exception as e:
logger.warning(
f"encountered an exception while applying fixes, recovering original files: {e}"
)
self._recover_files(tmp_files)
raise e

def _fix_file(self, filename: Path, fix_version: ResolvedFixVersion) -> None:
# Reparse the requirements file. We want to rewrite each line to the new requirements file
# and only modify the lines that we're fixing.
try:
reqs = parse_requirements(filename=filename)
except PipError as pe:
raise RequirementFixError(f"requirement parsing raised an error: {filename}") from pe

# Convert requirements types from pip-api's vendored types to our own
req_list: List[Requirement] = [Requirement(str(req)) for req in reqs.values()]

# Now write out the new requirements file
with open(filename, "w") as f:
for req in req_list:
if (
req.name == fix_version.dep.name
and req.specifier.contains(fix_version.dep.version)
and not req.specifier.contains(fix_version.version)
and (req.marker is None or req.marker.evaluate())
):
req.specifier = SpecifierSet(f"=={fix_version.version}")
f.write(str(req))

def _recover_files(self, tmp_files: List[IO[str]]) -> None:
for (filename, tmp_file) in zip(self.filenames, tmp_files):
try:
with open(filename, "w") as f:
tetsuo-cpp marked this conversation as resolved.
Show resolved Hide resolved
shutil.copyfileobj(tmp_file, f)
except Exception as e:
# Not much we can do at this point since we're already handling an exception. Just
# log the error and try to recover the rest of the files.
logger.warning(f"encountered an exception during file recovery: {e}")
continue
tetsuo-cpp marked this conversation as resolved.
Show resolved Hide resolved


class RequirementSourceError(DependencySourceError):
"""A requirements-parsing specific `DependencySourceError`."""

pass


class RequirementFixError(DependencyFixError):
"""A requirements-fixing specific `DependencyFixError`."""

pass