From 56697adf9803a7bcecadc737d8b160a9ab22392e Mon Sep 17 00:00:00 2001 From: Zachary Newman Date: Thu, 13 Oct 2022 17:43:47 -0400 Subject: [PATCH 1/5] Add GPG functions to use instead of constants. This way, we (eventually) won't shell out while importing this library. Signed-off-by: Zachary Newman --- securesystemslib/gpg/constants.py | 78 ++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/securesystemslib/gpg/constants.py b/securesystemslib/gpg/constants.py index fd6ec765..6873929e 100644 --- a/securesystemslib/gpg/constants.py +++ b/securesystemslib/gpg/constants.py @@ -15,6 +15,7 @@ aggregates all the constant definitions and lookup structures for signature handling """ +import functools import logging import os @@ -22,8 +23,9 @@ log = logging.getLogger(__name__) - -def is_available_gnupg(gnupg): +@functools.lru_cache(maxsize=3) +def is_available_gnupg(gnupg: str) -> bool: + """Returns whether gnupg points to a gpg binary.""" gpg_version_cmd = gnupg + " --version" try: process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE) @@ -32,36 +34,60 @@ def is_available_gnupg(gnupg): return False -GPG_COMMAND = "" -HAVE_GPG = False - GPG_ENV_COMMAND = os.environ.get('GNUPG') GPG2_COMMAND = "gpg2" GPG1_COMMAND = "gpg" -# By default, we allow providing GPG client through the environment -# assuming gpg2 as default value and test if exists. Otherwise, we assume gpg -# exists. -if GPG_ENV_COMMAND: - if is_available_gnupg(GPG_ENV_COMMAND): - GPG_COMMAND = GPG_ENV_COMMAND -elif is_available_gnupg(GPG2_COMMAND): - GPG_COMMAND = GPG2_COMMAND -elif is_available_gnupg(GPG1_COMMAND): - GPG_COMMAND = GPG1_COMMAND - -if GPG_COMMAND: - # Use bool to skip tests or fail early and gracefully if no gpg is available - HAVE_GPG = True - -GPG_VERSION_COMMAND = GPG_COMMAND + " --version" + +def gpg_command() -> str: + """Returns command to run GPG, or ``""``` if not found).""" + # By default, we allow providing GPG client through the environment + # assuming gpg2 as default value and test if exists. Otherwise, we assume gpg + # exists. + if GPG_ENV_COMMAND: + if is_available_gnupg(GPG_ENV_COMMAND): + return GPG_ENV_COMMAND + elif is_available_gnupg(GPG2_COMMAND): + return GPG2_COMMAND + elif is_available_gnupg(GPG1_COMMAND): + return GPG1_COMMAND + return "" + + +GPG_COMMAND = gpg_command() + + +def have_gpg() -> bool: + """Returns True if a gpg_command is available.""" + return bool(gpg_command()) + + +HAVE_GPG = have_gpg() + + +def gpg_version_command() -> str: + """Returns the command to get the current GPG version.""" + return f"{gpg_command()} --version" + +GPG_VERSION_COMMAND = gpg_version_command() FULLY_SUPPORTED_MIN_VERSION = "2.1.0" -NO_GPG_MSG = "GPG support requires a GPG client. 'gpg2' or 'gpg' with version {} or newer is" \ - " fully supported.".format(FULLY_SUPPORTED_MIN_VERSION) +NO_GPG_MSG = ( + f"GPG support requires a GPG client. 'gpg2' or 'gpg' with version " + f"{FULLY_SUPPORTED_MIN_VERSION} or newer is fully supported." +) + +def gpg_sign_command(keyarg: str, homearg: str) -> str: + """Returns the command to use GPG to sign STDIN.""" + return f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}" + +GPG_SIGN_COMMAND = gpg_sign_command("{keyarg}", "{homearg}") + + +def gpg_export_pubkey_command(homearg: str, keyid: str): + """Returns the GPG command to export a public key.""" + return f"{gpg_command()} {homearg} --export {keyid}" -GPG_SIGN_COMMAND = GPG_COMMAND + \ - " --detach-sign --digest-algo SHA256 {keyarg} {homearg}" -GPG_EXPORT_PUBKEY_COMMAND = GPG_COMMAND + " {homearg} --export {keyid}" +GPG_EXPORT_PUBKEY_COMMAND = gpg_export_pubkey_command("{homearg}", "{keyid}") # See RFC4880 section 4.3. Packet Tags for a list of all packet types The # relevant packets defined below are described in sections 5.2 (signature), From cb8e32b8ded5e7e3d1c09c447e175e297ac0d994 Mon Sep 17 00:00:00 2001 From: Zachary Newman Date: Thu, 13 Oct 2022 13:31:16 -0400 Subject: [PATCH 2/5] Check for GPG availability lazily. See #428. This commit is over-complicated, but (mostly) not breaking (you can continue to use `HAVE_GPG` and `GPG_COMMAND`). In this commit: - Add cacheing (`functools.lru_cache`) to `is_available_gnupg`. - Add `gpg_command()` to replace `GPG_COMMAND`. - Add `have_gpg()` to replace `HAVE_GPG`. - Replace `GPG_COMMAND` and `HAVE_GPG` with a `lazy.wrap_thunk()` wrapper for their corresponding functions. - This wrapper lazily runs the underlying thunk and passes through *everything* to the result of calling it. - This is a terrible hack and I'm sorry. - Some things still break, like `StringIO`. - Add a test that imports `securesystemslib.constants.gpg` with a busted `$GNUPG` to make sure that importing the library doesn't try to shell out. This failed before this change. Signed-off-by: Zachary Newman --- mypy.ini | 3 +- securesystemslib/gpg/_lazy.py | 53 +++++++++++++++++++++++++++++++ securesystemslib/gpg/constants.py | 18 +++++++---- securesystemslib/gpg/util.py | 2 +- tests/test_gpg.py | 6 ++-- tox.ini | 10 +++++- 6 files changed, 79 insertions(+), 13 deletions(-) create mode 100644 securesystemslib/gpg/_lazy.py diff --git a/mypy.ini b/mypy.ini index a54e1079..6a13550c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,7 +2,8 @@ warn_unused_configs = True files = securesystemslib/util.py, - securesystemslib/storage.py + securesystemslib/storage.py, + securesystemslib/gpg/constants.py # Supress error messages until enough modules # are type annotated diff --git a/securesystemslib/gpg/_lazy.py b/securesystemslib/gpg/_lazy.py new file mode 100644 index 00000000..8b95d0fb --- /dev/null +++ b/securesystemslib/gpg/_lazy.py @@ -0,0 +1,53 @@ +""" + + _lazy.py + + + Zachary Newman + + + Oct 13, 2022 + + + See LICENSE for licensing information. + + + helpers for backwards compatibility when replacing constants with functions. + + AKA crimes against Python +""" +from typing import Callable, TypeVar, Type, Any +import inspect + +T = TypeVar('T') + +def wrap_thunk(thunk: Callable[[], T]): + """Wraps ``thunk`` in an object that acts like the result of calling it.""" + + called = False + value = None + + def lazy(): + nonlocal called, value + if not called: + value = thunk() + called = True + return value + + superclass: Type[Any] = inspect.signature(thunk).return_annotation + if superclass in (bool,): + superclass = object + + class Wrapper(superclass): # type: ignore + """Object passing through to the result of a lazily-called thunk.""" + + def __getattribute__(self, attr): + return getattr(lazy(), attr) + + def __str__(self): + return str(lazy()) + + def __bool__(self): + return bool(lazy()) + + return Wrapper() diff --git a/securesystemslib/gpg/constants.py b/securesystemslib/gpg/constants.py index 6873929e..3d8b54c9 100644 --- a/securesystemslib/gpg/constants.py +++ b/securesystemslib/gpg/constants.py @@ -19,6 +19,8 @@ import logging import os +import securesystemslib.gpg._lazy as lazy + from securesystemslib import process log = logging.getLogger(__name__) @@ -53,23 +55,23 @@ def gpg_command() -> str: return GPG1_COMMAND return "" - -GPG_COMMAND = gpg_command() +GPG_COMMAND = lazy.wrap_thunk(gpg_command) def have_gpg() -> bool: """Returns True if a gpg_command is available.""" return bool(gpg_command()) - -HAVE_GPG = have_gpg() +HAVE_GPG = lazy.wrap_thunk(have_gpg) def gpg_version_command() -> str: """Returns the command to get the current GPG version.""" return f"{gpg_command()} --version" -GPG_VERSION_COMMAND = gpg_version_command() +GPG_VERSION_COMMAND = lazy.wrap_thunk(gpg_version_command) + + FULLY_SUPPORTED_MIN_VERSION = "2.1.0" NO_GPG_MSG = ( f"GPG support requires a GPG client. 'gpg2' or 'gpg' with version " @@ -80,14 +82,16 @@ def gpg_sign_command(keyarg: str, homearg: str) -> str: """Returns the command to use GPG to sign STDIN.""" return f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}" -GPG_SIGN_COMMAND = gpg_sign_command("{keyarg}", "{homearg}") +GPG_SIGN_COMMAND = lazy.wrap_thunk(lambda: gpg_sign_command("{keyarg}", "{homearg}")) def gpg_export_pubkey_command(homearg: str, keyid: str): """Returns the GPG command to export a public key.""" return f"{gpg_command()} {homearg} --export {keyid}" -GPG_EXPORT_PUBKEY_COMMAND = gpg_export_pubkey_command("{homearg}", "{keyid}") +GPG_EXPORT_PUBKEY_COMMAND = lazy.wrap_thunk( + lambda: gpg_export_pubkey_command("{homearg}", "{keyid}") +) # See RFC4880 section 4.3. Packet Tags for a list of all packet types The # relevant packets defined below are described in sections 5.2 (signature), diff --git a/securesystemslib/gpg/util.py b/securesystemslib/gpg/util.py index c41c03d4..35100efe 100644 --- a/securesystemslib/gpg/util.py +++ b/securesystemslib/gpg/util.py @@ -364,7 +364,7 @@ def get_version() -> Version: if not constants.HAVE_GPG: # pragma: no cover raise exceptions.UnsupportedLibraryError(constants.NO_GPG_MSG) - command = constants.GPG_VERSION_COMMAND + command = constants.gpg_version_command() gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE, universal_newlines=True) diff --git a/tests/test_gpg.py b/tests/test_gpg.py index ab6a4d2e..9a95b5ba 100644 --- a/tests/test_gpg.py +++ b/tests/test_gpg.py @@ -80,11 +80,11 @@ def test_version_utils_return_types(self): self.assertTrue(isinstance(get_version(), Version)) self.assertTrue(isinstance(is_version_fully_supported(), bool)) - @patch('securesystemslib.gpg.constants.GPG_VERSION_COMMAND', 'echo "bad"') def test_version_utils_error(self): """Run dummy tests for coverage. """ - with self.assertRaises(exceptions.UnsupportedLibraryError): - get_version() + with patch('securesystemslib.gpg.constants.HAVE_GPG', False): + with self.assertRaises(exceptions.UnsupportedLibraryError): + get_version() def test_get_hashing_class(self): # Assert return expected hashing class diff --git a/tox.ini b/tox.ini index 97606e91..8bfe0060 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = mypy, py37, py38, py39, py310, purepy38, py38-no-gpg +envlist = mypy, py37, py38, py39, py310, purepy38, py38-no-gpg, py38-test-gpg-fails skipsdist = True [testenv] @@ -33,6 +33,14 @@ setenv = commands = python -m tests.check_public_interfaces_gpg +# This checks that importing securesystemslib.gpg.constants doesn't shell out on +# import. +[testenv:py38-test-gpg-fails] +setenv = + GNUPG = false +commands = + python -c "import securesystemslib.gpg.constants" + [testenv:mypy] commands = mypy From 19ba269dbe35b6abbfb5be949df3b04440a58927 Mon Sep 17 00:00:00 2001 From: Zachary Newman Date: Thu, 13 Oct 2022 17:29:03 -0400 Subject: [PATCH 3/5] Use GPG functions instead of constants internally Signed-off-by: Zachary Newman --- securesystemslib/gpg/common.py | 4 ++-- securesystemslib/gpg/functions.py | 16 ++++++++-------- securesystemslib/gpg/util.py | 6 +++--- securesystemslib/signer.py | 2 +- tests/check_gpg_available.py | 2 +- tests/check_public_interfaces_gpg.py | 4 ++-- tests/test_gpg.py | 20 ++++++++++---------- tests/test_signer.py | 4 ++-- 8 files changed, 29 insertions(+), 29 deletions(-) diff --git a/securesystemslib/gpg/common.py b/securesystemslib/gpg/common.py index 1269ae1a..72ef9592 100644 --- a/securesystemslib/gpg/common.py +++ b/securesystemslib/gpg/common.py @@ -147,7 +147,7 @@ def parse_pubkey_bundle(data): data: - Public key data as written to stdout by GPG_EXPORT_PUBKEY_COMMAND. + Public key data as written to stdout by gpg_export_pubkey_command. securesystemslib.gpg.exceptions.PacketParsingError @@ -500,7 +500,7 @@ def get_pubkey_bundle(data, keyid): data: Public key data as written to stdout by - securesystemslib.gpg.constants.GPG_EXPORT_PUBKEY_COMMAND. + securesystemslib.gpg.constants.gpg_export_pubkey_command. keyid: The keyid of the master key or one of its subkeys expected to be diff --git a/securesystemslib/gpg/functions.py b/securesystemslib/gpg/functions.py index 4c008880..7167fe0f 100644 --- a/securesystemslib/gpg/functions.py +++ b/securesystemslib/gpg/functions.py @@ -26,9 +26,9 @@ CommandError, KeyExpirationError) from securesystemslib.gpg.constants import ( FULLY_SUPPORTED_MIN_VERSION, - GPG_EXPORT_PUBKEY_COMMAND, - GPG_SIGN_COMMAND, - HAVE_GPG, + gpg_export_pubkey_command, + gpg_sign_command, + have_gpg, NO_GPG_MSG, SHA256) from securesystemslib.gpg.handlers import ( @@ -50,7 +50,7 @@ def create_signature(content, keyid=None, homedir=None): identified by the passed keyid from the gpg keyring at the passed homedir. The executed base command is defined in - securesystemslib.gpg.constants.GPG_SIGN_COMMAND. + securesystemslib.gpg.constants.gpg_sign_command. NOTE: On not fully supported versions of GPG, i.e. versions below securesystemslib.gpg.constants.FULLY_SUPPORTED_MIN_VERSION the returned @@ -99,7 +99,7 @@ def create_signature(content, keyid=None, homedir=None): securesystemslib.formats.GPG_SIGNATURE_SCHEMA. """ - if not HAVE_GPG: # pragma: no cover + if not have_gpg(): # pragma: no cover raise exceptions.UnsupportedLibraryError(NO_GPG_MSG) if not CRYPTO: # pragma: no cover @@ -114,7 +114,7 @@ def create_signature(content, keyid=None, homedir=None): if homedir: homearg = "--homedir {}".format(homedir).replace("\\", "/") - command = GPG_SIGN_COMMAND.format(keyarg=keyarg, homearg=homearg) + command = gpg_sign_command(keyarg=keyarg, homearg=homearg) gpg_process = process.run(command, input=content, check=False, stdout=process.PIPE, stderr=process.PIPE) @@ -258,7 +258,7 @@ def export_pubkey(keyid, homedir=None): An OpenPGP public key object in GPG_PUBKEY_SCHEMA format. """ - if not HAVE_GPG: # pragma: no cover + if not have_gpg(): # pragma: no cover raise exceptions.UnsupportedLibraryError(NO_GPG_MSG) if not CRYPTO: # pragma: no cover @@ -276,7 +276,7 @@ def export_pubkey(keyid, homedir=None): # TODO: Consider adopting command error handling from `create_signature` # above, e.g. in a common 'run gpg command' utility function - command = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg) + command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE) key_packet = gpg_process.stdout diff --git a/securesystemslib/gpg/util.py b/securesystemslib/gpg/util.py index 35100efe..d09719f1 100644 --- a/securesystemslib/gpg/util.py +++ b/securesystemslib/gpg/util.py @@ -348,20 +348,20 @@ def get_version() -> Version: Uses `gpg2 --version` to get the version info of the installed gpg2 and extracts and returns the version number. - The executed base command is defined in constants.GPG_VERSION_COMMAND. + The executed base command is defined in constants.gpg_version_command. securesystemslib.exceptions.UnsupportedLibraryError: If the gpg command is not available - Executes a command: constants.GPG_VERSION_COMMAND. + Executes a command: constants.gpg_version_command. Version of GPG. """ - if not constants.HAVE_GPG: # pragma: no cover + if not constants.have_gpg(): # pragma: no cover raise exceptions.UnsupportedLibraryError(constants.NO_GPG_MSG) command = constants.gpg_version_command() diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 5032e013..c595eb3f 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -237,7 +237,7 @@ def sign(self, payload: bytes) -> GPGSignature: homedir. The executed base command is defined in - securesystemslib.gpg.constants.GPG_SIGN_COMMAND. + securesystemslib.gpg.constants.gpg_sign_command. Arguments: payload: The bytes to be signed. diff --git a/tests/check_gpg_available.py b/tests/check_gpg_available.py index 325b4d39..4a38f2d8 100644 --- a/tests/check_gpg_available.py +++ b/tests/check_gpg_available.py @@ -36,7 +36,7 @@ class TestGpgAvailable(unittest.TestCase): def test_gpg_available(self): """Test that GPG is available.""" - self.assertTrue(securesystemslib.gpg.constants.HAVE_GPG) + self.assertTrue(securesystemslib.gpg.constants.have_gpg()) if __name__ == "__main__": unittest.main(verbosity=1, buffer=True) diff --git a/tests/check_public_interfaces_gpg.py b/tests/check_public_interfaces_gpg.py index 595640aa..9c2eb664 100644 --- a/tests/check_public_interfaces_gpg.py +++ b/tests/check_public_interfaces_gpg.py @@ -25,7 +25,7 @@ """ import unittest -from securesystemslib.gpg.constants import HAVE_GPG, NO_GPG_MSG +from securesystemslib.gpg.constants import have_gpg, NO_GPG_MSG from securesystemslib.gpg.util import get_version from securesystemslib.gpg.functions import ( create_signature, export_pubkey, export_pubkeys, verify_signature) @@ -36,7 +36,7 @@ class TestPublicInterfacesGPG(unittest.TestCase): @classmethod def setUpClass(cls): - assert not HAVE_GPG, \ + assert not have_gpg(), \ "please remove GnuPG from your environment to run this test case" def test_gpg_functions(self): diff --git a/tests/test_gpg.py b/tests/test_gpg.py index 9a95b5ba..81aed1b1 100644 --- a/tests/test_gpg.py +++ b/tests/test_gpg.py @@ -51,8 +51,8 @@ parse_pubkey_bundle, get_pubkey_bundle, _assign_certified_key_info, _get_verified_subkeys, parse_signature_packet) from securesystemslib.gpg.constants import (SHA1, SHA256, SHA512, - GPG_EXPORT_PUBKEY_COMMAND, PACKET_TYPE_PRIMARY_KEY, PACKET_TYPE_USER_ID, - PACKET_TYPE_USER_ATTR, PACKET_TYPE_SUB_KEY, HAVE_GPG) + gpg_export_pubkey_command, PACKET_TYPE_PRIMARY_KEY, PACKET_TYPE_USER_ID, + PACKET_TYPE_USER_ATTR, PACKET_TYPE_SUB_KEY, have_gpg) from securesystemslib.gpg.exceptions import (PacketParsingError, PacketVersionNotSupportedError, SignatureAlgorithmNotSupportedError, KeyNotFoundError, CommandError, KeyExpirationError) @@ -71,7 +71,7 @@ def ignore_not_found_error(function, path, exc_info): raise error -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestUtil(unittest.TestCase): """Test util functions. """ @@ -82,7 +82,7 @@ def test_version_utils_return_types(self): def test_version_utils_error(self): """Run dummy tests for coverage. """ - with patch('securesystemslib.gpg.constants.HAVE_GPG', False): + with patch('securesystemslib.gpg.constants.have_gpg', return_value=False): with self.assertRaises(exceptions.UnsupportedLibraryError): get_version() @@ -191,7 +191,7 @@ def test_parse_subpacket_header(self): self.assertEqual(result, expected[idx]) -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestCommon(unittest.TestCase): """Test common functions of the securesystemslib.gpg module. """ @classmethod @@ -203,14 +203,14 @@ def setUpClass(self): # Load test raw public key bundle from rsa keyring, used to construct # erroneous gpg data in tests below. keyid = "F557D0FF451DEF45372591429EA70BD13D883381" - cmd = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg) + cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE) self.raw_key_data = proc.stdout self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data) # Export pubkey bundle with expired key for key expiration tests keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C" - cmd = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg) + cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE) self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout) @@ -491,7 +491,7 @@ def test_parse_signature_packet_errors(self): "'{}' not in '{}'".format(expected_error_str, str(ctx.exception))) -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestGPGRSA(unittest.TestCase): """Test signature creation, verification and key export from the gpg module""" @@ -658,7 +658,7 @@ def test_verify_signature_with_expired_key(self): "\ngot: {}".format(expected, ctx.exception)) -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestGPGDSA(unittest.TestCase): """ Test signature creation, verification and key export from the gpg module """ @@ -743,7 +743,7 @@ def test_gpg_sign_and_verify_object(self): -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestGPGEdDSA(unittest.TestCase): """ Test signature creation, verification and key export from the gpg module """ diff --git a/tests/test_signer.py b/tests/test_signer.py index 8d48fbb1..f2018ede 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -12,7 +12,7 @@ import securesystemslib.keys as KEYS from securesystemslib.exceptions import FormatError, UnsupportedAlgorithmError from securesystemslib.signer import GPGSignature, Signature, SSlibSigner, GPGSigner -from securesystemslib.gpg.constants import HAVE_GPG +from securesystemslib.gpg.constants import have_gpg from securesystemslib.gpg.functions import export_pubkey, verify_signature as verify_sig @@ -95,7 +95,7 @@ def test_signature_eq_(self): self.assertNotEqual(sig_obj, sig_obj_2) -@unittest.skipIf(not HAVE_GPG, "gpg not found") +@unittest.skipIf(not have_gpg(), "gpg not found") class TestGPGRSA(unittest.TestCase): """Test RSA gpg signature creation and verification.""" From c879125bbb325f3e0f1ddb91ca13b30fce451500 Mon Sep 17 00:00:00 2001 From: Zachary Newman Date: Thu, 13 Oct 2022 18:04:01 -0400 Subject: [PATCH 4/5] Get rid of GPG constants. After the previous commit, they're not used *inside* securesystemslib. Outside, we can check [sourcegraph]. This will break a few folks, notably [in-toto] (just tests) and [DataDog/integrations-core]. But it's an easy enough fix. [sourcegraph]: https://sourcegraph.com/search?q=context:global+lang:python+securesystemslib+AND+%28HAVE_GPG+OR+GPG_COMMAND+OR+GPG_VERSION_COMMAND+OR+GPG_SIGN_COMMAND+OR+GPG_EXPORT_PUBKEY_COMMAND%29&patternType=standard [in-toto]: https://github.com/in-toto/in-toto/blob/c1a5e6b7468ccc74d7e79bc8bec2caf96ddb31f1/tests/test_verifylib.py#L58 [DataDog/integrations-core]: https://github.com/DataDog/integrations-core/blob/0e20752603f0d43db44090c8777d4ea69ca7111a/datadog_checks_dev/datadog_checks/dev/tooling/signing.py#L13 Signed-off-by: Zachary Newman --- securesystemslib/gpg/_lazy.py | 53 ------------------------------- securesystemslib/gpg/constants.py | 18 ----------- 2 files changed, 71 deletions(-) delete mode 100644 securesystemslib/gpg/_lazy.py diff --git a/securesystemslib/gpg/_lazy.py b/securesystemslib/gpg/_lazy.py deleted file mode 100644 index 8b95d0fb..00000000 --- a/securesystemslib/gpg/_lazy.py +++ /dev/null @@ -1,53 +0,0 @@ -""" - - _lazy.py - - - Zachary Newman - - - Oct 13, 2022 - - - See LICENSE for licensing information. - - - helpers for backwards compatibility when replacing constants with functions. - - AKA crimes against Python -""" -from typing import Callable, TypeVar, Type, Any -import inspect - -T = TypeVar('T') - -def wrap_thunk(thunk: Callable[[], T]): - """Wraps ``thunk`` in an object that acts like the result of calling it.""" - - called = False - value = None - - def lazy(): - nonlocal called, value - if not called: - value = thunk() - called = True - return value - - superclass: Type[Any] = inspect.signature(thunk).return_annotation - if superclass in (bool,): - superclass = object - - class Wrapper(superclass): # type: ignore - """Object passing through to the result of a lazily-called thunk.""" - - def __getattribute__(self, attr): - return getattr(lazy(), attr) - - def __str__(self): - return str(lazy()) - - def __bool__(self): - return bool(lazy()) - - return Wrapper() diff --git a/securesystemslib/gpg/constants.py b/securesystemslib/gpg/constants.py index 3d8b54c9..c5ceacdf 100644 --- a/securesystemslib/gpg/constants.py +++ b/securesystemslib/gpg/constants.py @@ -19,8 +19,6 @@ import logging import os -import securesystemslib.gpg._lazy as lazy - from securesystemslib import process log = logging.getLogger(__name__) @@ -55,23 +53,14 @@ def gpg_command() -> str: return GPG1_COMMAND return "" -GPG_COMMAND = lazy.wrap_thunk(gpg_command) - - def have_gpg() -> bool: """Returns True if a gpg_command is available.""" return bool(gpg_command()) -HAVE_GPG = lazy.wrap_thunk(have_gpg) - - def gpg_version_command() -> str: """Returns the command to get the current GPG version.""" return f"{gpg_command()} --version" -GPG_VERSION_COMMAND = lazy.wrap_thunk(gpg_version_command) - - FULLY_SUPPORTED_MIN_VERSION = "2.1.0" NO_GPG_MSG = ( f"GPG support requires a GPG client. 'gpg2' or 'gpg' with version " @@ -82,17 +71,10 @@ def gpg_sign_command(keyarg: str, homearg: str) -> str: """Returns the command to use GPG to sign STDIN.""" return f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}" -GPG_SIGN_COMMAND = lazy.wrap_thunk(lambda: gpg_sign_command("{keyarg}", "{homearg}")) - - def gpg_export_pubkey_command(homearg: str, keyid: str): """Returns the GPG command to export a public key.""" return f"{gpg_command()} {homearg} --export {keyid}" -GPG_EXPORT_PUBKEY_COMMAND = lazy.wrap_thunk( - lambda: gpg_export_pubkey_command("{homearg}", "{keyid}") -) - # See RFC4880 section 4.3. Packet Tags for a list of all packet types The # relevant packets defined below are described in sections 5.2 (signature), # 5.5.1.1 (primary pubkey) and 5.5.1.2 (pub subkey), 5.12 (user id) and 5.13 From a5c00875832648185c2c3f604629f3074a647fdb Mon Sep 17 00:00:00 2001 From: Zachary Newman Date: Thu, 13 Oct 2022 18:53:25 -0400 Subject: [PATCH 5/5] Catch timeouts when checking for GPG availability. Fixes #428. Okay to do this because we will fail tests if GPG is unexpectedly unavailable (#434). Signed-off-by: Zachary Newman --- securesystemslib/gpg/constants.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/securesystemslib/gpg/constants.py b/securesystemslib/gpg/constants.py index c5ceacdf..c5afcba6 100644 --- a/securesystemslib/gpg/constants.py +++ b/securesystemslib/gpg/constants.py @@ -18,6 +18,7 @@ import functools import logging import os +import subprocess from securesystemslib import process @@ -30,7 +31,7 @@ def is_available_gnupg(gnupg: str) -> bool: try: process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE) return True - except OSError: + except (OSError, subprocess.TimeoutExpired): return False