From 7696e7e5303b23d233097cf26749bc1e4a5e0645 Mon Sep 17 00:00:00 2001 From: Maxim Kurnikov Date: Sun, 16 Dec 2018 12:16:39 +0300 Subject: [PATCH] Add type annotations for pip._internal.download, pip._internal.wheel and pip._internal.pep425tags (#6067) References https://github.com/pypa/pip/issues/4748, https://github.com/pypa/pip/pull/6038. --- src/pip/_internal/download.py | 70 ++++++++++++++++--- src/pip/_internal/pep425tags.py | 57 ++++++++++++--- src/pip/_internal/wheel.py | 118 ++++++++++++++++++++++++-------- 3 files changed, 195 insertions(+), 50 deletions(-) diff --git a/src/pip/_internal/download.py b/src/pip/_internal/download.py index 4cc0774e7cc..fbd820ad1d2 100644 --- a/src/pip/_internal/download.py +++ b/src/pip/_internal/download.py @@ -48,7 +48,13 @@ from pip._internal.vcs import vcs if MYPY_CHECK_RUNNING: - from typing import Optional # noqa: F401 + from typing import ( # noqa: F401 + Optional, Tuple, Dict, IO, Text, Union + ) + from pip._internal.models.link import Link # noqa: F401 + from pip._internal.utils.hashes import Hashes # noqa: F401 + # cannot import alias directly here, fixed in mypy==0.641 + import pip._internal.vcs as vcs_type_aliases # noqa: F401 try: import ssl # noqa @@ -139,8 +145,9 @@ def user_agent(): class MultiDomainBasicAuth(AuthBase): def __init__(self, prompting=True): + # type: (bool) -> None self.prompting = prompting - self.passwords = {} + self.passwords = {} # type: Dict[str, vcs_type_aliases.AuthInfo] def __call__(self, req): parsed = urllib_parse.urlparse(req.url) @@ -398,6 +405,7 @@ def request(self, method, url, *args, **kwargs): def get_file_content(url, comes_from=None, session=None): + # type: (str, Optional[str], Optional[PipSession]) -> Tuple[str, Text] """Gets the content of a file; it may be a filename, file: URL, or http: URL. Returns (location, content). Content is unicode. @@ -448,6 +456,7 @@ def get_file_content(url, comes_from=None, session=None): def is_url(name): + # type: (Union[str, Text]) -> bool """Returns true if the name looks like a URL""" if ':' not in name: return False @@ -456,6 +465,7 @@ def is_url(name): def url_to_path(url): + # type: (str) -> str """ Convert a file: URL to a path. """ @@ -473,6 +483,7 @@ def url_to_path(url): def path_to_url(path): + # type: (Union[str, Text]) -> str """ Convert a path to a file: URL. The path will be made absolute and have quoted path parts. @@ -483,6 +494,7 @@ def path_to_url(path): def is_archive_file(name): + # type: (str) -> bool """Return True if `name` is a considered as an archive file.""" ext = splitext(name)[1].lower() if ext in ARCHIVE_EXTENSIONS: @@ -503,14 +515,17 @@ def _get_used_vcs_backend(link): def is_vcs_url(link): + # type: (Link) -> bool return bool(_get_used_vcs_backend(link)) def is_file_url(link): + # type: (Link) -> bool return link.url.lower().startswith('file:') def is_dir_url(link): + # type: (Link) -> bool """Return whether a file:// Link points to a directory. ``link`` must not have any other scheme but file://. Call is_file_url() @@ -525,7 +540,14 @@ def _progress_indicator(iterable, *args, **kwargs): return iterable -def _download_url(resp, link, content_file, hashes, progress_bar): +def _download_url( + resp, # type: Response + link, # type: Link + content_file, # type: IO + hashes, # type: Hashes + progress_bar # type: str +): + # type: (...) -> None try: total_length = int(resp.headers['content-length']) except (ValueError, KeyError, TypeError): @@ -647,8 +669,15 @@ def _copy_file(filename, location, link): logger.info('Saved %s', display_path(download_location)) -def unpack_http_url(link, location, download_dir=None, - session=None, hashes=None, progress_bar="on"): +def unpack_http_url( + link, # type: Link + location, # type: str + download_dir=None, # type: Optional[str] + session=None, # type: Optional[PipSession] + hashes=None, # type: Optional[Hashes] + progress_bar="on" # type: str +): + # type: (...) -> None if session is None: raise TypeError( "unpack_http_url() missing 1 required keyword argument: 'session'" @@ -685,7 +714,13 @@ def unpack_http_url(link, location, download_dir=None, os.unlink(from_path) -def unpack_file_url(link, location, download_dir=None, hashes=None): +def unpack_file_url( + link, # type: Link + location, # type: str + download_dir=None, # type: Optional[str] + hashes=None # type: Optional[Hashes] +): + # type: (...) -> None """Unpack link into location. If download_dir is provided and link points to a file, make a copy @@ -798,9 +833,16 @@ def request(self, host, handler, request_body, verbose=False): raise -def unpack_url(link, location, download_dir=None, - only_download=False, session=None, hashes=None, - progress_bar="on"): +def unpack_url( + link, # type: Optional[Link] + location, # type: Optional[str] + download_dir=None, # type: Optional[str] + only_download=False, # type: bool + session=None, # type: Optional[PipSession] + hashes=None, # type: Optional[Hashes] + progress_bar="on" # type: str +): + # type: (...) -> None """Unpack link. If link is a VCS link: if only_download, export into download_dir and ignore location @@ -840,7 +882,14 @@ def unpack_url(link, location, download_dir=None, write_delete_marker_file(location) -def _download_http_url(link, session, temp_dir, hashes, progress_bar): +def _download_http_url( + link, # type: Link + session, # type: PipSession + temp_dir, # type: str + hashes, # type: Hashes + progress_bar # type: str +): + # type: (...) -> Tuple[str, str] """Download link url into temp_dir using provided session""" target_url = link.url.split('#', 1)[0] try: @@ -900,6 +949,7 @@ def _download_http_url(link, session, temp_dir, hashes, progress_bar): def _check_download_dir(link, download_dir, hashes): + # type: (Link, str, Hashes) -> Optional[str] """ Check download_dir for previously downloaded file with correct hash If a correct file is found return its path else None """ diff --git a/src/pip/_internal/pep425tags.py b/src/pip/_internal/pep425tags.py index 7062d7f5fdd..1e782d1ae8a 100644 --- a/src/pip/_internal/pep425tags.py +++ b/src/pip/_internal/pep425tags.py @@ -12,6 +12,14 @@ import pip._internal.utils.glibc from pip._internal.utils.compat import get_extension_suffixes +from pip._internal.utils.typing import MYPY_CHECK_RUNNING + +if MYPY_CHECK_RUNNING: + from typing import ( # noqa: F401 + Tuple, Callable, List, Optional, Union, Dict + ) + + Pep425Tag = Tuple[str, str, str] logger = logging.getLogger(__name__) @@ -19,6 +27,7 @@ def get_config_var(var): + # type: (str) -> Optional[str] try: return sysconfig.get_config_var(var) except IOError as e: # Issue #1074 @@ -27,6 +36,7 @@ def get_config_var(var): def get_abbr_impl(): + # type: () -> str """Return abbreviated implementation name.""" if hasattr(sys, 'pypy_version_info'): pyimpl = 'pp' @@ -40,6 +50,7 @@ def get_abbr_impl(): def get_impl_ver(): + # type: () -> str """Return implementation version.""" impl_ver = get_config_var("py_version_nodot") if not impl_ver or get_abbr_impl() == 'pp': @@ -48,17 +59,21 @@ def get_impl_ver(): def get_impl_version_info(): + # type: () -> Tuple[int, ...] """Return sys.version_info-like tuple for use in decrementing the minor version.""" if get_abbr_impl() == 'pp': # as per https://github.com/pypa/pip/issues/2882 - return (sys.version_info[0], sys.pypy_version_info.major, - sys.pypy_version_info.minor) + # attrs exist only on pypy + return (sys.version_info[0], + sys.pypy_version_info.major, # type: ignore + sys.pypy_version_info.minor) # type: ignore else: return sys.version_info[0], sys.version_info[1] def get_impl_tag(): + # type: () -> str """ Returns the Tag for this specific implementation. """ @@ -66,6 +81,7 @@ def get_impl_tag(): def get_flag(var, fallback, expected=True, warn=True): + # type: (str, Callable[..., bool], Union[bool, int], bool) -> bool """Use a fallback method for determining SOABI flags if the needed config var is unset or unavailable.""" val = get_config_var(var) @@ -78,6 +94,7 @@ def get_flag(var, fallback, expected=True, warn=True): def get_abi_tag(): + # type: () -> Optional[str] """Return the ABI tag based on SOABI (if available) or emulate SOABI (CPython 2, PyPy).""" soabi = get_config_var('SOABI') @@ -112,10 +129,12 @@ def get_abi_tag(): def _is_running_32bit(): + # type: () -> bool return sys.maxsize == 2147483647 def get_platform(): + # type: () -> str """Return our platform name 'win32', 'linux_x86_64'""" if sys.platform == 'darwin': # distutils.util.get_platform() returns the release based on the value @@ -142,6 +161,7 @@ def get_platform(): def is_manylinux1_compatible(): + # type: () -> bool # Only Linux, and only x86-64 / i686 if get_platform() not in {"linux_x86_64", "linux_i686"}: return False @@ -159,6 +179,7 @@ def is_manylinux1_compatible(): def is_manylinux2010_compatible(): + # type: () -> bool # Only Linux, and only x86-64 / i686 if get_platform() not in {"linux_x86_64", "linux_i686"}: return False @@ -176,12 +197,14 @@ def is_manylinux2010_compatible(): def get_darwin_arches(major, minor, machine): + # type: (int, int, str) -> List[str] """Return a list of supported arches (including group arches) for the given major, minor and machine architecture of an macOS machine. """ arches = [] def _supports_arch(major, minor, arch): + # type: (int, int, str) -> bool # Looking at the application support for macOS versions in the chart # provided by https://en.wikipedia.org/wiki/OS_X#Versions it appears # our timeline looks roughly like: @@ -222,7 +245,7 @@ def _supports_arch(major, minor, arch): ("intel", ("x86_64", "i386")), ("fat64", ("x86_64", "ppc64")), ("fat32", ("x86_64", "i386", "ppc")), - ]) + ]) # type: Dict[str, Tuple[str, ...]] if _supports_arch(major, minor, machine): arches.append(machine) @@ -236,8 +259,24 @@ def _supports_arch(major, minor, arch): return arches -def get_supported(versions=None, noarch=False, platform=None, - impl=None, abi=None): +def get_all_minor_versions_as_strings(version_info): + # type: (Tuple[int, ...]) -> List[str] + versions = [] + major = version_info[:-1] + # Support all previous minor Python versions. + for minor in range(version_info[-1], -1, -1): + versions.append(''.join(map(str, major + (minor,)))) + return versions + + +def get_supported( + versions=None, # type: Optional[List[str]] + noarch=False, # type: bool + platform=None, # type: Optional[str] + impl=None, # type: Optional[str] + abi=None # type: Optional[str] +): + # type: (...) -> List[Pep425Tag] """Return a list of supported tags for each version specified in `versions`. @@ -254,16 +293,12 @@ def get_supported(versions=None, noarch=False, platform=None, # Versions must be given with respect to the preference if versions is None: - versions = [] version_info = get_impl_version_info() - major = version_info[:-1] - # Support all previous minor Python versions. - for minor in range(version_info[-1], -1, -1): - versions.append(''.join(map(str, major + (minor,)))) + versions = get_all_minor_versions_as_strings(version_info) impl = impl or get_abbr_impl() - abis = [] + abis = [] # type: List[str] abi = abi or get_abi_tag() if abi: diff --git a/src/pip/_internal/wheel.py b/src/pip/_internal/wheel.py index d645ea557c6..93b4768622a 100644 --- a/src/pip/_internal/wheel.py +++ b/src/pip/_internal/wheel.py @@ -41,7 +41,22 @@ from pip._internal.utils.ui import open_spinner if MYPY_CHECK_RUNNING: - from typing import Dict, List, Optional # noqa: F401 + from typing import ( # noqa: F401 + Dict, List, Optional, Sequence, Mapping, Tuple, IO, Text, Any, + Union, Iterable + ) + from pip._vendor.packaging.requirements import Requirement # noqa: F401 + from pip._internal.req.req_install import InstallRequirement # noqa: F401 + from pip._internal.download import PipSession # noqa: F401 + from pip._internal.index import PackageFinder # noqa: F401 + from pip._internal.operations.prepare import ( # noqa: F401 + RequirementPreparer + ) + from pip._internal.cache import WheelCache # noqa: F401 + from pip._internal.pep425tags import Pep425Tag # noqa: F401 + + InstalledCSVRow = Tuple[str, Union[str, Text], str] + VERSION_COMPATIBLE = (1, 0) @@ -50,6 +65,7 @@ def rehash(path, blocksize=1 << 20): + # type: (str, int) -> Tuple[str, str] """Return (hash, length) for path using hashlib.sha256()""" h = hashlib.sha256() length = 0 @@ -60,20 +76,23 @@ def rehash(path, blocksize=1 << 20): digest = 'sha256=' + urlsafe_b64encode( h.digest() ).decode('latin1').rstrip('=') - return (digest, length) + # unicode/str python2 issues + return (digest, str(length)) # type: ignore def open_for_csv(name, mode): + # type: (str, Text) -> IO if sys.version_info[0] < 3: - nl = {} + nl = {} # type: Dict[str, Any] bin = 'b' else: - nl = {'newline': ''} + nl = {'newline': ''} # type: Dict[str, Any] bin = '' return open(name, mode + bin, **nl) def replace_python_tag(wheelname, new_tag): + # type: (str, str) -> str """Replace the Python tag in a wheel file name with a new value. """ parts = wheelname.split('-') @@ -82,6 +101,7 @@ def replace_python_tag(wheelname, new_tag): def fix_script(path): + # type: (str) -> Optional[bool] """Replace #!python with #!/path/to/python Return True if file was changed.""" # XXX RECORD hashes will need to be updated @@ -97,6 +117,7 @@ def fix_script(path): script.write(firstline) script.write(rest) return True + return None dist_info_re = re.compile(r"""^(?P(?P.+?)(-(?P.+?))?) @@ -104,6 +125,7 @@ def fix_script(path): def root_is_purelib(name, wheeldir): + # type: (str, str) -> bool """ Return True if the extracted wheel in wheeldir should go into purelib. """ @@ -120,6 +142,7 @@ def root_is_purelib(name, wheeldir): def get_entrypoints(filename): + # type: (str) -> Tuple[Dict[str, str], Dict[str, str]] if not os.path.exists(filename): return {}, {} @@ -151,7 +174,7 @@ def _split_ep(s): def message_about_scripts_not_on_PATH(scripts): - # type: (List[str]) -> Optional[str] + # type: (Sequence[str]) -> Optional[str] """Determine if any scripts are not on PATH and format a warning. Returns a warning message if one or more scripts are not on PATH, @@ -212,6 +235,7 @@ def message_about_scripts_not_on_PATH(scripts): def sorted_outrows(outrows): + # type: (Iterable[InstalledCSVRow]) -> List[InstalledCSVRow] """ Return the given rows of a RECORD file in sorted order. @@ -231,9 +255,20 @@ def sorted_outrows(outrows): return sorted(outrows, key=lambda row: tuple(str(x) for x in row)) -def move_wheel_files(name, req, wheeldir, user=False, home=None, root=None, - pycompile=True, scheme=None, isolated=False, prefix=None, - warn_script_location=True): +def move_wheel_files( + name, # type: str + req, # type: Requirement + wheeldir, # type: str + user=False, # type: bool + home=None, # type: Optional[str] + root=None, # type: Optional[str] + pycompile=True, # type: bool + scheme=None, # type: Optional[Mapping[str, str]] + isolated=False, # type: bool + prefix=None, # type: Optional[str] + warn_script_location=True # type: bool +): + # type: (...) -> None """Install a wheel""" # TODO: Investigate and break this up. # TODO: Look into moving this into a dedicated class for representing an @@ -250,7 +285,7 @@ def move_wheel_files(name, req, wheeldir, user=False, home=None, root=None, else: lib_dir = scheme['platlib'] - info_dir = [] + info_dir = [] # type: List[str] data_dirs = [] source = wheeldir.rstrip(os.path.sep) + os.path.sep @@ -258,9 +293,9 @@ def move_wheel_files(name, req, wheeldir, user=False, home=None, root=None, # installed = files copied from the wheel to the destination # changed = files changed while installing (scripts #! line typically) # generated = files newly generated during the install (script wrappers) - installed = {} + installed = {} # type: Dict[str, str] changed = set() - generated = [] + generated = [] # type: List[str] # Compile all of the pyc files that we're going to be installing if pycompile: @@ -418,8 +453,9 @@ def _get_script_text(entry): "import_name": entry.suffix.split(".")[0], "func": entry.suffix, } - - maker._get_script_text = _get_script_text + # ignore type, because mypy disallows assigning to a method, + # see https://github.com/python/mypy/issues/2427 + maker._get_script_text = _get_script_text # type: ignore maker.script_template = r"""# -*- coding: utf-8 -*- import re import sys @@ -523,24 +559,29 @@ def _get_script_text(entry): shutil.move(temp_installer, installer) generated.append(installer) + def get_csv_rows_for_installed(old_csv_rows): + # type: (Iterable[List[str]]) -> List[InstalledCSVRow] + installed_rows = [] # type: List[InstalledCSVRow] + for fpath, digest, length in old_csv_rows: + fpath = installed.pop(fpath, fpath) + if fpath in changed: + digest, length = rehash(fpath) + installed_rows.append((fpath, digest, str(length))) + for f in generated: + digest, length = rehash(f) + installed_rows.append((normpath(f, lib_dir), digest, str(length))) + for f in installed: + installed_rows.append((installed[f], '', '')) + return installed_rows + # Record details of all files installed record = os.path.join(info_dir[0], 'RECORD') temp_record = os.path.join(info_dir[0], 'RECORD.pip') with open_for_csv(record, 'r') as record_in: with open_for_csv(temp_record, 'w+') as record_out: reader = csv.reader(record_in) + outrows = get_csv_rows_for_installed(reader) writer = csv.writer(record_out) - outrows = [] - for row in reader: - row[0] = installed.pop(row[0], row[0]) - if row[0] in changed: - row[1], row[2] = rehash(row[0]) - outrows.append(tuple(row)) - for f in generated: - digest, length = rehash(f) - outrows.append((normpath(f, lib_dir), digest, length)) - for f in installed: - outrows.append((installed[f], '', '')) # Sort to simplify testing. for row in sorted_outrows(outrows): writer.writerow(row) @@ -548,10 +589,11 @@ def _get_script_text(entry): def wheel_version(source_dir): + # type: (Optional[str]) -> Optional[Tuple[int, ...]] """ Return the Wheel-Version of an extracted wheel, if possible. - Otherwise, return False if we couldn't parse / extract it. + Otherwise, return None if we couldn't parse / extract it. """ try: dist = [d for d in pkg_resources.find_on_path(None, source_dir)][0] @@ -563,10 +605,11 @@ def wheel_version(source_dir): version = tuple(map(int, version.split('.'))) return version except Exception: - return False + return None def check_compatibility(version, name): + # type: (Optional[Tuple[int, ...]], str) -> None """ Raises errors or warns if called with an incompatible Wheel-Version. @@ -609,6 +652,7 @@ class Wheel(object): ) def __init__(self, filename): + # type: (str) -> None """ :raises InvalidWheelFilename: when the filename is invalid for a wheel """ @@ -634,6 +678,7 @@ def __init__(self, filename): } def support_index_min(self, tags=None): + # type: (Optional[List[Pep425Tag]]) -> Optional[int] """ Return the lowest index that one of the wheel's file_tag combinations achieves in the supported_tags list e.g. if there are 8 supported tags, @@ -646,6 +691,7 @@ def support_index_min(self, tags=None): return min(indexes) if indexes else None def supported(self, tags=None): + # type: (Optional[List[Pep425Tag]]) -> bool """Is this wheel supported on this system?""" if tags is None: # for mock tags = pep425tags.get_supported() @@ -664,8 +710,16 @@ def _contains_egg_info( class WheelBuilder(object): """Build wheels from a RequirementSet.""" - def __init__(self, finder, preparer, wheel_cache, - build_options=None, global_options=None, no_clean=False): + def __init__( + self, + finder, # type: PackageFinder + preparer, # type: RequirementPreparer + wheel_cache, # type: WheelCache + build_options=None, # type: Optional[List[str]] + global_options=None, # type: Optional[List[str]] + no_clean=False # type: bool + ): + # type: (...) -> None self.finder = finder self.preparer = preparer self.wheel_cache = wheel_cache @@ -773,7 +827,13 @@ def _clean_one(self, req): logger.error('Failed cleaning build dir for %s', req.name) return False - def build(self, requirements, session, autobuilding=False): + def build( + self, + requirements, # type: Iterable[InstallRequirement] + session, # type: PipSession + autobuilding=False # type: bool + ): + # type: (...) -> List[InstallRequirement] """Build wheels. :param unpack: If True, replace the sdist we built from with the