Skip to content

Commit

Permalink
Use stdlib subprocess.run for gpg processes
Browse files Browse the repository at this point in the history
This commit replaces calls to the custom process.run function, in
favor of the stdlib subprocess.run.

The former is a very thin wrapper around the latter, which was more
useful when we still needed Py2/3 case-handling. Which is not the
case anymore.

NOTE: This commit replicates the prior behavior, most notably the
following subprocess.run arguments:
- *popenargs: we still use shlex.split to turn the full command
              string into a list
- check

Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Feb 9, 2023
1 parent fd3c71f commit 6199f46
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 28 deletions.
25 changes: 13 additions & 12 deletions securesystemslib/gpg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import functools
import logging
import os
import shlex
import subprocess # nosec

from securesystemslib import process
from typing import List

log = logging.getLogger(__name__)

Expand All @@ -30,13 +30,14 @@
@functools.lru_cache(maxsize=3)
def is_available_gnupg(gnupg: str, timeout=GPG_TIMEOUT) -> bool:
"""Returns whether gnupg points to a gpg binary."""
gpg_version_cmd = gnupg + " --version"
gpg_version_cmd = shlex.split(f"{gnupg} --version")
try:
process.run(
subprocess.run(
gpg_version_cmd,
stdout=process.PIPE,
stderr=process.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=True,
)
return True
except (OSError, subprocess.TimeoutExpired):
Expand Down Expand Up @@ -68,9 +69,9 @@ def have_gpg() -> bool:
return bool(gpg_command())


def gpg_version_command() -> str:
def gpg_version_command() -> List[str]:
"""Returns the command to get the current GPG version."""
return f"{gpg_command()} --version"
return shlex.split(f"{gpg_command()} --version")


FULLY_SUPPORTED_MIN_VERSION = "2.1.0"
Expand All @@ -80,16 +81,16 @@ def gpg_version_command() -> str:
)


def gpg_sign_command(keyarg: str, homearg: str) -> str:
def gpg_sign_command(keyarg: str, homearg: str) -> List[str]:
"""Returns the command to use GPG to sign STDIN."""
return (
return shlex.split(
f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}"
)


def gpg_export_pubkey_command(homearg: str, keyid: str):
def gpg_export_pubkey_command(homearg: str, keyid: str) -> List[str]:
"""Returns the GPG command to export a public key."""
return f"{gpg_command()} {homearg} --export {keyid}"
return shlex.split(f"{gpg_command()} {homearg} --export {keyid}")


# See RFC4880 section 4.3. Packet Tags for a list of all packet types The
Expand Down
18 changes: 11 additions & 7 deletions securesystemslib/gpg/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
verifying signatures.
"""
import logging
import subprocess # nosec
import time

from securesystemslib import exceptions, formats, process
from securesystemslib import exceptions, formats
from securesystemslib.gpg.common import (
get_pubkey_bundle,
parse_signature_packet,
Expand Down Expand Up @@ -125,12 +126,12 @@ def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):

command = gpg_sign_command(keyarg=keyarg, homearg=homearg)

gpg_process = process.run(
gpg_process = subprocess.run(
command,
input=content,
check=False,
stdout=process.PIPE,
stderr=process.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)

Expand Down Expand Up @@ -313,9 +314,12 @@ def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT):
# TODO: Consider adopting command error handling from `create_signature`
# above, e.g. in a common 'run gpg command' utility function
command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)

gpg_process = process.run(
command, stdout=process.PIPE, stderr=process.PIPE, timeout=timeout
gpg_process = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=True,
)

key_packet = gpg_process.stdout
Expand Down
10 changes: 6 additions & 4 deletions securesystemslib/gpg/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import re
import struct
import subprocess # nosec

CRYPTO = True
NO_CRYPTO_MSG = "gpg.utils requires the cryptography library"
Expand All @@ -29,7 +30,7 @@
CRYPTO = False

# pylint: disable=wrong-import-position
from securesystemslib import exceptions, process
from securesystemslib import exceptions
from securesystemslib.gpg import constants
from securesystemslib.gpg.exceptions import PacketParsingError

Expand Down Expand Up @@ -375,12 +376,13 @@ def get_version() -> Version:
raise exceptions.UnsupportedLibraryError(constants.NO_GPG_MSG)

command = constants.gpg_version_command()
gpg_process = process.run(
gpg_process = subprocess.run(
command,
stdout=process.PIPE,
stderr=process.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=constants.GPG_TIMEOUT,
check=True,
)

full_version_info = gpg_process.stdout
Expand Down
19 changes: 14 additions & 5 deletions tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import os
import shutil
import subprocess # nosec
import tempfile
import unittest

Expand All @@ -33,7 +34,7 @@
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization

from securesystemslib import exceptions, process
from securesystemslib import exceptions
from securesystemslib.formats import ANY_PUBKEY_DICT_SCHEMA, GPG_PUBKEY_SCHEMA
from securesystemslib.gpg.common import (
_assign_certified_key_info,
Expand Down Expand Up @@ -235,17 +236,25 @@ def setUpClass(self): # pylint: disable=bad-classmethod-argument
# erroneous gpg data in tests below.
keyid = "F557D0FF451DEF45372591429EA70BD13D883381"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(
cmd, stdout=process.PIPE, stderr=process.PIPE, timeout=GPG_TIMEOUT
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_key_data = proc.stdout
self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data)

# Export pubkey bundle with expired key for key expiration tests
keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(
cmd, stdout=process.PIPE, stderr=process.PIPE, timeout=GPG_TIMEOUT
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout)

Expand Down

0 comments on commit 6199f46

Please sign in to comment.