Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't shell out to GPG on import #437

Merged
merged 5 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
warn_unused_configs = True
files =
securesystemslib/util.py,
securesystemslib/storage.py
securesystemslib/storage.py,
securesystemslib/gpg/constants.py

# Supress error messages until enough modules
# are type annotated
Expand Down
4 changes: 2 additions & 2 deletions securesystemslib/gpg/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def parse_pubkey_bundle(data):

<Arguments>
data:
Public key data as written to stdout by GPG_EXPORT_PUBKEY_COMMAND.
Public key data as written to stdout by gpg_export_pubkey_command.

<Exceptions>
securesystemslib.gpg.exceptions.PacketParsingError
Expand Down Expand Up @@ -500,7 +500,7 @@ def get_pubkey_bundle(data, keyid):
<Arguments>
data:
Public key data as written to stdout by
securesystemslib.gpg.constants.GPG_EXPORT_PUBKEY_COMMAND.
securesystemslib.gpg.constants.gpg_export_pubkey_command.

keyid:
The keyid of the master key or one of its subkeys expected to be
Expand Down
69 changes: 41 additions & 28 deletions securesystemslib/gpg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,66 @@
aggregates all the constant definitions and lookup structures for signature
handling
"""
import functools
import logging
import os
import subprocess

from securesystemslib import process

log = logging.getLogger(__name__)


def is_available_gnupg(gnupg):
@functools.lru_cache(maxsize=3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why maxsize=3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to put a maxsize for py3.7, which is in the tox matrix. Otherwise would use the default.

I picked 3 because we should only call it three times under normal circumstances (gpg, gpg2, $GNUPG). That's perhaps a little bit too clever but I had a hard time justifying other numbers.

def is_available_gnupg(gnupg: str) -> bool:
"""Returns whether gnupg points to a gpg binary."""
gpg_version_cmd = gnupg + " --version"
try:
process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE)
return True
except OSError:
except (OSError, subprocess.TimeoutExpired):
return False


GPG_COMMAND = ""
HAVE_GPG = False

GPG_ENV_COMMAND = os.environ.get('GNUPG')
GPG2_COMMAND = "gpg2"
GPG1_COMMAND = "gpg"

# By default, we allow providing GPG client through the environment
# assuming gpg2 as default value and test if exists. Otherwise, we assume gpg
# exists.
if GPG_ENV_COMMAND:
if is_available_gnupg(GPG_ENV_COMMAND):
GPG_COMMAND = GPG_ENV_COMMAND
elif is_available_gnupg(GPG2_COMMAND):
GPG_COMMAND = GPG2_COMMAND
elif is_available_gnupg(GPG1_COMMAND):
GPG_COMMAND = GPG1_COMMAND

if GPG_COMMAND:
# Use bool to skip tests or fail early and gracefully if no gpg is available
HAVE_GPG = True

GPG_VERSION_COMMAND = GPG_COMMAND + " --version"
FULLY_SUPPORTED_MIN_VERSION = "2.1.0"
NO_GPG_MSG = "GPG support requires a GPG client. 'gpg2' or 'gpg' with version {} or newer is" \
" fully supported.".format(FULLY_SUPPORTED_MIN_VERSION)

GPG_SIGN_COMMAND = GPG_COMMAND + \
" --detach-sign --digest-algo SHA256 {keyarg} {homearg}"
GPG_EXPORT_PUBKEY_COMMAND = GPG_COMMAND + " {homearg} --export {keyid}"
def gpg_command() -> str:
"""Returns command to run GPG, or ``""``` if not found)."""
# By default, we allow providing GPG client through the environment
# assuming gpg2 as default value and test if exists. Otherwise, we assume gpg
# exists.
if GPG_ENV_COMMAND:
if is_available_gnupg(GPG_ENV_COMMAND):
return GPG_ENV_COMMAND
elif is_available_gnupg(GPG2_COMMAND):
return GPG2_COMMAND
elif is_available_gnupg(GPG1_COMMAND):
return GPG1_COMMAND
return ""

def have_gpg() -> bool:
"""Returns True if a gpg_command is available."""
return bool(gpg_command())

def gpg_version_command() -> str:
"""Returns the command to get the current GPG version."""
return f"{gpg_command()} --version"

FULLY_SUPPORTED_MIN_VERSION = "2.1.0"
NO_GPG_MSG = (
f"GPG support requires a GPG client. 'gpg2' or 'gpg' with version "
f"{FULLY_SUPPORTED_MIN_VERSION} or newer is fully supported."
)

def gpg_sign_command(keyarg: str, homearg: str) -> str:
"""Returns the command to use GPG to sign STDIN."""
return f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}"

def gpg_export_pubkey_command(homearg: str, keyid: str):
"""Returns the GPG command to export a public key."""
return f"{gpg_command()} {homearg} --export {keyid}"

# See RFC4880 section 4.3. Packet Tags for a list of all packet types The
# relevant packets defined below are described in sections 5.2 (signature),
Expand Down
16 changes: 8 additions & 8 deletions securesystemslib/gpg/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
CommandError, KeyExpirationError)
from securesystemslib.gpg.constants import (
FULLY_SUPPORTED_MIN_VERSION,
GPG_EXPORT_PUBKEY_COMMAND,
GPG_SIGN_COMMAND,
HAVE_GPG,
gpg_export_pubkey_command,
gpg_sign_command,
have_gpg,
NO_GPG_MSG,
SHA256)
from securesystemslib.gpg.handlers import (
Expand All @@ -50,7 +50,7 @@ def create_signature(content, keyid=None, homedir=None):
identified by the passed keyid from the gpg keyring at the passed homedir.

The executed base command is defined in
securesystemslib.gpg.constants.GPG_SIGN_COMMAND.
securesystemslib.gpg.constants.gpg_sign_command.

NOTE: On not fully supported versions of GPG, i.e. versions below
securesystemslib.gpg.constants.FULLY_SUPPORTED_MIN_VERSION the returned
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_signature(content, keyid=None, homedir=None):
securesystemslib.formats.GPG_SIGNATURE_SCHEMA.

"""
if not HAVE_GPG: # pragma: no cover
if not have_gpg(): # pragma: no cover
raise exceptions.UnsupportedLibraryError(NO_GPG_MSG)

if not CRYPTO: # pragma: no cover
Expand All @@ -114,7 +114,7 @@ def create_signature(content, keyid=None, homedir=None):
if homedir:
homearg = "--homedir {}".format(homedir).replace("\\", "/")

command = GPG_SIGN_COMMAND.format(keyarg=keyarg, homearg=homearg)
command = gpg_sign_command(keyarg=keyarg, homearg=homearg)

gpg_process = process.run(command, input=content, check=False,
stdout=process.PIPE, stderr=process.PIPE)
Expand Down Expand Up @@ -258,7 +258,7 @@ def export_pubkey(keyid, homedir=None):
An OpenPGP public key object in GPG_PUBKEY_SCHEMA format.

"""
if not HAVE_GPG: # pragma: no cover
if not have_gpg(): # pragma: no cover
raise exceptions.UnsupportedLibraryError(NO_GPG_MSG)

if not CRYPTO: # pragma: no cover
Expand All @@ -276,7 +276,7 @@ def export_pubkey(keyid, homedir=None):

# TODO: Consider adopting command error handling from `create_signature`
# above, e.g. in a common 'run gpg command' utility function
command = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg)
command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE)

key_packet = gpg_process.stdout
Expand Down
8 changes: 4 additions & 4 deletions securesystemslib/gpg/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,23 +348,23 @@ def get_version() -> Version:
Uses `gpg2 --version` to get the version info of the installed gpg2
and extracts and returns the version number.

The executed base command is defined in constants.GPG_VERSION_COMMAND.
The executed base command is defined in constants.gpg_version_command.

<Exceptions>
securesystemslib.exceptions.UnsupportedLibraryError:
If the gpg command is not available

<Side Effects>
Executes a command: constants.GPG_VERSION_COMMAND.
Executes a command: constants.gpg_version_command.

<Returns>
Version of GPG.

"""
if not constants.HAVE_GPG: # pragma: no cover
if not constants.have_gpg(): # pragma: no cover
raise exceptions.UnsupportedLibraryError(constants.NO_GPG_MSG)

command = constants.GPG_VERSION_COMMAND
command = constants.gpg_version_command()
gpg_process = process.run(command, stdout=process.PIPE,
stderr=process.PIPE, universal_newlines=True)

Expand Down
2 changes: 1 addition & 1 deletion securesystemslib/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def sign(self, payload: bytes) -> GPGSignature:
homedir.

The executed base command is defined in
securesystemslib.gpg.constants.GPG_SIGN_COMMAND.
securesystemslib.gpg.constants.gpg_sign_command.

Arguments:
payload: The bytes to be signed.
Expand Down
2 changes: 1 addition & 1 deletion tests/check_gpg_available.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TestGpgAvailable(unittest.TestCase):

def test_gpg_available(self):
"""Test that GPG is available."""
self.assertTrue(securesystemslib.gpg.constants.HAVE_GPG)
self.assertTrue(securesystemslib.gpg.constants.have_gpg())

if __name__ == "__main__":
unittest.main(verbosity=1, buffer=True)
4 changes: 2 additions & 2 deletions tests/check_public_interfaces_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"""

import unittest
from securesystemslib.gpg.constants import HAVE_GPG, NO_GPG_MSG
from securesystemslib.gpg.constants import have_gpg, NO_GPG_MSG
from securesystemslib.gpg.util import get_version
from securesystemslib.gpg.functions import (
create_signature, export_pubkey, export_pubkeys, verify_signature)
Expand All @@ -36,7 +36,7 @@
class TestPublicInterfacesGPG(unittest.TestCase):
@classmethod
def setUpClass(cls):
assert not HAVE_GPG, \
assert not have_gpg(), \
"please remove GnuPG from your environment to run this test case"

def test_gpg_functions(self):
Expand Down
24 changes: 12 additions & 12 deletions tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
parse_pubkey_bundle, get_pubkey_bundle, _assign_certified_key_info,
_get_verified_subkeys, parse_signature_packet)
from securesystemslib.gpg.constants import (SHA1, SHA256, SHA512,
GPG_EXPORT_PUBKEY_COMMAND, PACKET_TYPE_PRIMARY_KEY, PACKET_TYPE_USER_ID,
PACKET_TYPE_USER_ATTR, PACKET_TYPE_SUB_KEY, HAVE_GPG)
gpg_export_pubkey_command, PACKET_TYPE_PRIMARY_KEY, PACKET_TYPE_USER_ID,
PACKET_TYPE_USER_ATTR, PACKET_TYPE_SUB_KEY, have_gpg)
from securesystemslib.gpg.exceptions import (PacketParsingError,
PacketVersionNotSupportedError, SignatureAlgorithmNotSupportedError,
KeyNotFoundError, CommandError, KeyExpirationError)
Expand All @@ -71,7 +71,7 @@ def ignore_not_found_error(function, path, exc_info):
raise error


@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestUtil(unittest.TestCase):
"""Test util functions. """

Expand All @@ -80,11 +80,11 @@ def test_version_utils_return_types(self):
self.assertTrue(isinstance(get_version(), Version))
self.assertTrue(isinstance(is_version_fully_supported(), bool))

@patch('securesystemslib.gpg.constants.GPG_VERSION_COMMAND', 'echo "bad"')
def test_version_utils_error(self):
"""Run dummy tests for coverage. """
with self.assertRaises(exceptions.UnsupportedLibraryError):
get_version()
with patch('securesystemslib.gpg.constants.have_gpg', return_value=False):
with self.assertRaises(exceptions.UnsupportedLibraryError):
get_version()

def test_get_hashing_class(self):
# Assert return expected hashing class
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_parse_subpacket_header(self):
self.assertEqual(result, expected[idx])


@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestCommon(unittest.TestCase):
"""Test common functions of the securesystemslib.gpg module. """
@classmethod
Expand All @@ -203,14 +203,14 @@ def setUpClass(self):
# Load test raw public key bundle from rsa keyring, used to construct
# erroneous gpg data in tests below.
keyid = "F557D0FF451DEF45372591429EA70BD13D883381"
cmd = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg)
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
self.raw_key_data = proc.stdout
self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data)

# Export pubkey bundle with expired key for key expiration tests
keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C"
cmd = GPG_EXPORT_PUBKEY_COMMAND.format(keyid=keyid, homearg=homearg)
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout)

Expand Down Expand Up @@ -491,7 +491,7 @@ def test_parse_signature_packet_errors(self):
"'{}' not in '{}'".format(expected_error_str, str(ctx.exception)))


@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestGPGRSA(unittest.TestCase):
"""Test signature creation, verification and key export from the gpg
module"""
Expand Down Expand Up @@ -658,7 +658,7 @@ def test_verify_signature_with_expired_key(self):
"\ngot: {}".format(expected, ctx.exception))


@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestGPGDSA(unittest.TestCase):
""" Test signature creation, verification and key export from the gpg
module """
Expand Down Expand Up @@ -743,7 +743,7 @@ def test_gpg_sign_and_verify_object(self):



@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestGPGEdDSA(unittest.TestCase):
""" Test signature creation, verification and key export from the gpg
module """
Expand Down
4 changes: 2 additions & 2 deletions tests/test_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import securesystemslib.keys as KEYS
from securesystemslib.exceptions import FormatError, UnsupportedAlgorithmError
from securesystemslib.signer import GPGSignature, Signature, SSlibSigner, GPGSigner
from securesystemslib.gpg.constants import HAVE_GPG
from securesystemslib.gpg.constants import have_gpg
from securesystemslib.gpg.functions import export_pubkey, verify_signature as verify_sig


Expand Down Expand Up @@ -95,7 +95,7 @@ def test_signature_eq_(self):
self.assertNotEqual(sig_obj, sig_obj_2)


@unittest.skipIf(not HAVE_GPG, "gpg not found")
@unittest.skipIf(not have_gpg(), "gpg not found")
class TestGPGRSA(unittest.TestCase):
"""Test RSA gpg signature creation and verification."""

Expand Down
10 changes: 9 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# and then run "tox" from this directory.

[tox]
envlist = mypy, py37, py38, py39, py310, purepy38, py38-no-gpg
envlist = mypy, py37, py38, py39, py310, purepy38, py38-no-gpg, py38-test-gpg-fails
skipsdist = True

[testenv]
Expand Down Expand Up @@ -33,6 +33,14 @@ setenv =
commands =
python -m tests.check_public_interfaces_gpg

# This checks that importing securesystemslib.gpg.constants doesn't shell out on
# import.
[testenv:py38-test-gpg-fails]
setenv =
GNUPG = false
commands =
python -c "import securesystemslib.gpg.constants"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test that imports securesystemslib.constants.gpg with a busted
$GNUPG to make sure that importing the library doesn't try to shell
out. This failed before this change.

This failed before occasionally (e.g. on timeout), right? I don't think we need this in addition to [testenv:py38-no-gpg].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@znewman01, please remove unless necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It failed every time before, not occasionally.

Note that I set GNUPG = false in this environment, which will give an error. That's different from unset GNUPG, which will find no GPG.

I don't believe that py38-no-gpg tests the same thing. This test checks "if we shell out when importing gpg.constants we fail"; py38-no-gpg tests "if we have no GPG available, tests still pass."

It might be clearer to replace false with a script like touch /tmp/oops_i_ran and check that that file wasn't created after import but that felt a little too clever.

As to the question—is this necessary? I don't know; are any tests necessary? Without it, we're not really checking that the issue is fixed—I could revert the rest of the PR and the existing tests would still pass. Up to the maintainers whether you want a test for the fix or not.


[testenv:mypy]
commands =
mypy