Skip to content

Commit

Permalink
Merge pull request #502 from lukpueh/gpg-subprocess-timeout
Browse files Browse the repository at this point in the history
Improve gpg subprocess timeout and stop using custom process module
  • Loading branch information
lukpueh authored Feb 10, 2023
2 parents 3982498 + ac413ba commit e5f2296
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
29 changes: 18 additions & 11 deletions securesystemslib/gpg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
import functools
import logging
import os
import shlex
import subprocess # nosec

from securesystemslib import process
from typing import List

log = logging.getLogger(__name__)

GPG_TIMEOUT = 10


@functools.lru_cache(maxsize=3)
def is_available_gnupg(gnupg: str) -> bool:
def is_available_gnupg(gnupg: str, timeout=GPG_TIMEOUT) -> bool:
"""Returns whether gnupg points to a gpg binary."""
gpg_version_cmd = gnupg + " --version"
gpg_version_cmd = shlex.split(f"{gnupg} --version")
try:
process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE)
subprocess.run( # nosec
gpg_version_cmd,
capture_output=True,
timeout=timeout,
check=True,
)
return True
except (OSError, subprocess.TimeoutExpired):
return False
Expand Down Expand Up @@ -61,9 +68,9 @@ def have_gpg() -> bool:
return bool(gpg_command())


def gpg_version_command() -> str:
def gpg_version_command() -> List[str]:
"""Returns the command to get the current GPG version."""
return f"{gpg_command()} --version"
return shlex.split(f"{gpg_command()} --version")


FULLY_SUPPORTED_MIN_VERSION = "2.1.0"
Expand All @@ -73,16 +80,16 @@ def gpg_version_command() -> str:
)


def gpg_sign_command(keyarg: str, homearg: str) -> str:
def gpg_sign_command(keyarg: str, homearg: str) -> List[str]:
"""Returns the command to use GPG to sign STDIN."""
return (
return shlex.split(
f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}"
)


def gpg_export_pubkey_command(homearg: str, keyid: str):
def gpg_export_pubkey_command(homearg: str, keyid: str) -> List[str]:
"""Returns the GPG command to export a public key."""
return f"{gpg_command()} {homearg} --export {keyid}"
return shlex.split(f"{gpg_command()} {homearg} --export {keyid}")


# See RFC4880 section 4.3. Packet Tags for a list of all packet types The
Expand Down
30 changes: 21 additions & 9 deletions securesystemslib/gpg/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
verifying signatures.
"""
import logging
import subprocess # nosec
import time

from securesystemslib import exceptions, formats, process
from securesystemslib import exceptions, formats
from securesystemslib.gpg.common import (
get_pubkey_bundle,
parse_signature_packet,
)
from securesystemslib.gpg.constants import (
FULLY_SUPPORTED_MIN_VERSION,
GPG_TIMEOUT,
NO_GPG_MSG,
SHA256,
gpg_export_pubkey_command,
Expand All @@ -40,7 +42,7 @@
NO_CRYPTO_MSG = "GPG support requires the cryptography library"


def create_signature(content, keyid=None, homedir=None):
def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):
"""
<Purpose>
Calls the gpg command line utility to sign the passed content with the key
Expand All @@ -66,6 +68,9 @@ def create_signature(content, keyid=None, homedir=None):
homedir: (optional)
Path to the gpg keyring. If not passed the default keyring is used.
timeout (optional):
gpg command timeout in seconds. Default is 10.
<Exceptions>
securesystemslib.exceptions.FormatError:
If the keyid was passed and does not match
Expand Down Expand Up @@ -121,12 +126,12 @@ def create_signature(content, keyid=None, homedir=None):

command = gpg_sign_command(keyarg=keyarg, homearg=homearg)

gpg_process = process.run(
gpg_process = subprocess.run( # nosec
command,
input=content,
check=False,
stdout=process.PIPE,
stderr=process.PIPE,
capture_output=True,
timeout=timeout,
)

# TODO: It's suggested to take a look at `--status-fd` for proper error
Expand Down Expand Up @@ -261,13 +266,14 @@ def verify_signature(signature_object, pubkey_info, content):
)


def export_pubkey(keyid, homedir=None):
def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT):
"""Exports a public key from a GnuPG keyring.
Arguments:
keyid: An OpenPGP keyid in KEYID_SCHEMA format.
homedir (optional): A path to the GnuPG home directory. If not set the
default GnuPG home directory is used.
timeout (optional): gpg command timeout in seconds. Default is 10.
Raises:
ValueError: Keyid is not a string.
Expand Down Expand Up @@ -307,21 +313,27 @@ def export_pubkey(keyid, homedir=None):
# TODO: Consider adopting command error handling from `create_signature`
# above, e.g. in a common 'run gpg command' utility function
command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE)
gpg_process = subprocess.run( # nosec
command,
capture_output=True,
timeout=timeout,
check=True,
)

key_packet = gpg_process.stdout
key_bundle = get_pubkey_bundle(key_packet, keyid)

return key_bundle


def export_pubkeys(keyids, homedir=None):
def export_pubkeys(keyids, homedir=None, timeout=GPG_TIMEOUT):
"""Exports multiple public keys from a GnuPG keyring.
Arguments:
keyids: A list of OpenPGP keyids in KEYID_SCHEMA format.
homedir (optional): A path to the GnuPG home directory. If not set the
default GnuPG home directory is used.
timeout (optional): gpg command timeout in seconds. Default is 10.
Raises:
TypeError: Keyids is not iterable.
Expand All @@ -341,7 +353,7 @@ def export_pubkeys(keyids, homedir=None):
"""
public_key_dict = {}
for gpg_keyid in keyids:
public_key = export_pubkey(gpg_keyid, homedir=homedir)
public_key = export_pubkey(gpg_keyid, homedir=homedir, timeout=timeout)
keyid = public_key["keyid"]
public_key_dict[keyid] = public_key

Expand Down
17 changes: 14 additions & 3 deletions tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import os
import shutil
import subprocess # nosec
import tempfile
import unittest

Expand All @@ -33,7 +34,6 @@
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization

from securesystemslib import process
from securesystemslib.formats import ANY_PUBKEY_DICT_SCHEMA, GPG_PUBKEY_SCHEMA
from securesystemslib.gpg.common import (
_assign_certified_key_info,
Expand All @@ -44,6 +44,7 @@
parse_signature_packet,
)
from securesystemslib.gpg.constants import (
GPG_TIMEOUT,
PACKET_TYPE_PRIMARY_KEY,
PACKET_TYPE_SUB_KEY,
PACKET_TYPE_USER_ATTR,
Expand Down Expand Up @@ -218,14 +219,24 @@ def setUpClass(self): # pylint: disable=bad-classmethod-argument
# erroneous gpg data in tests below.
keyid = "F557D0FF451DEF45372591429EA70BD13D883381"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
proc = subprocess.run(
cmd,
capture_output=True,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_key_data = proc.stdout
self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data)

# Export pubkey bundle with expired key for key expiration tests
keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
proc = subprocess.run(
cmd,
capture_output=True,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout)

def test_parse_pubkey_payload_errors(self):
Expand Down

0 comments on commit e5f2296

Please sign in to comment.