Skip to content

Commit

Permalink
runlib: add signer kwarg to run and record methods
Browse files Browse the repository at this point in the history
Adds optional signer (Signer) arg to runlib's run/record functions, as
alternative way of signing resulting link metadata, instead of using
signing_key, gpg_keyid, or use_default_gpg.

Closes in-toto#532

**Addtional notes:**

In in_toto_record_stop, the public_key (Key) field of the signer is used
to verify the preliminary link metadata file. The field is not yet part
of the interface, although all signer implementations in secureystemslib
have it: secure-systems-lab/securesystemslib#605

The patch aims to be minimally invasive, and thus barely refactors any
of the existing signing argument handling in the relevant functions.
Although, it was tempting to simplify the code, it turned out harder
than thought, and therefor not worth the effort, given that these
arguments are bound to
deprecation.

This patch is part of a series of patches to prepare for the planned
removal of securesystemslib legacy interfaces, see
secure-systems-lab/securesystemslib#604 for details.

**TODO (follow-up)**
- add deprecation warning for legacy key args and legacy gpg formats
  (only show on API use, not on CLI use!)

- add Metadata.verify(public_key: Key), if necessary (maybe we can just
  do this in runlib/verifylib), and use in in_toto_record_stop instead
  of passing `signer.public_key.to_dict()` to `Metadata.verify_signature(...)`

- prepare for ssslib legacy interface removal in other parts of in_toto:
  - in-toto-run, in-toto-record (in-toto#533)
  - verifylib / in-toto-verify
  - in-toto-sign
  - Metadata API / docs
  - ...

Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Oct 25, 2023
1 parent f27f0aa commit 1c23378
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 30 deletions.
119 changes: 89 additions & 30 deletions in_toto/runlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import securesystemslib.formats
import securesystemslib.gpg
import securesystemslib.hash
from securesystemslib.signer import Signature, SSlibSigner
from securesystemslib.signer import Key, Signature, Signer, SSlibSigner

import in_toto.exceptions
import in_toto.settings
Expand Down Expand Up @@ -401,6 +401,26 @@ def _check_match_signing_key(signing_key):
)


def _check_signer(signer):
if not isinstance(signer, Signer):
raise ValueError("signer must be a Signer instance")

if not (
hasattr(signer, "public_key") and isinstance(signer.public_key, Key)
):
# TODO: add `public_key` to `Signer` interface upstream
# see secure-systems-lab/securesystemslib#605
raise ValueError("only Signer instances with public key supported")


def _require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default):
if not any([signer, signing_key, gpg_keyid, gpg_use_default]):
raise ValueError(
"Pass either a signer, a signing key, a gpg keyid or set"
" gpg_use_default to True!"
)


def in_toto_run(
name,
material_list,
Expand All @@ -420,14 +440,15 @@ def in_toto_run(
metadata_directory=None,
use_dsse=False,
timeout=in_toto.settings.LINK_CMD_EXEC_TIMEOUT,
signer=None,
):
"""Performs a supply chain step or inspection generating link metadata.
Executes link_cmd_args, recording paths and hashes of files before and after
command execution (aka. artifacts) in a link metadata file. The metadata is
signed with the passed signing_key, a gpg key identified by its ID, or the
default gpg key. If multiple key arguments are passed, only one key is used
in above order of precedence. The resulting link file is written to
signed with the passed signer, signing_key, a gpg key identified by its ID, or
the default gpg key. If multiple key arguments are passed, only one key is
used in above order of precedence. The resulting link file is written to
``STEP-NAME.KEYID-PREFIX.link``. If no key argument is passed the link
metadata is neither signed nor written to disk.
Expand Down Expand Up @@ -484,9 +505,12 @@ def in_toto_run(
use_dsse (optional): A boolean indicating if DSSE should be used to
generate metadata.
timeout (optional): An integer indicating the max timeout in seconds
timeout (optional): An integer indicating the max timeout in seconds
for this command. Default is 10 seconds.
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -524,8 +548,12 @@ def in_toto_run(
LOG.info("Running '%s'...", name)

# Check key formats to fail early
if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand Down Expand Up @@ -593,8 +621,10 @@ def in_toto_run(
LOG.info("Generating link metadata using Metablock...")
link_metadata = Metablock(signed=link, compact_json=compact_json)

signer = None
if signing_key:
if signer:
LOG.info("Signing link metadata using passed signer...")

elif signing_key:
LOG.info("Signing link metadata using passed key...")
signer = SSlibSigner(signing_key)

Expand Down Expand Up @@ -635,12 +665,13 @@ def in_toto_record_start(
normalize_line_endings=False,
lstrip_paths=None,
use_dsse=False,
signer=None,
):
"""Generates preliminary link metadata.
Records paths and hashes of materials in a preliminary link metadata file.
The metadata is signed with the passed signing_key, a gpg key identified by
its ID, or the default gpg key. If multiple key arguments are passed, only
Records paths and hashes of materials in a preliminary link metadata file. The
metadata is signed with the passed signer, signing_key, a gpg key identified
by its ID, or the default gpg key. If multiple key arguments are passed, only
one key is used in above order of precedence. At least one key argument must
be passed. The resulting link file is written to
``.STEP-NAME.KEYID-PREFIX.link-unfinished``.
Expand Down Expand Up @@ -686,6 +717,9 @@ def in_toto_record_start(
use_dsse (optional): A boolean indicating if DSSE should be used to
generate metadata.
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -715,20 +749,20 @@ def in_toto_record_start(
Writes preliminary link metadata file to disk.
"""
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals,too-many-branches

LOG.info("Start recording '%s'...", step_name)

# Fail if there is no signing key arg at all
if not signing_key and not gpg_keyid and not gpg_use_default:
raise ValueError(
"Pass either a signing key, a gpg keyid or set"
" gpg_use_default to True!"
)
_require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default)

# Check key formats to fail early
if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand Down Expand Up @@ -771,7 +805,10 @@ def in_toto_record_start(
LOG.info("Generating link metadata using Metablock...")
link_metadata = Metablock(signed=link)

if signing_key:
if signer:
LOG.info("Signing link metadata using passed signer...")

elif signing_key:
LOG.info("Signing link metadata using passed key...")
signer = SSlibSigner(signing_key)

Expand Down Expand Up @@ -810,16 +847,17 @@ def in_toto_record_stop(
command=None,
byproducts=None,
environment=None,
signer=None,
):
"""Finalizes preliminary link metadata generated with in_toto_record_start.
Loads preliminary link metadata file, verifies its signature, and records
paths and hashes as products, thus finalizing the link metadata. The metadata
is signed with the passed signing_key, a gpg key identified by its ID, or the
default gpg key. If multiple key arguments are passed, only one key is used
in above order of precedence. At least one key argument must be passed and it
must be the same as the one used to sign the preliminary link metadata file.
The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``.
is signed with the passed signer, signing_key, a gpg key identified by its ID,
or the default gpg key. If multiple key arguments are passed, only one key is
used in above order of precedence. At least one key argument must be passed
and it must be the same as the one used to sign the preliminary link metadata
file. The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``.
Use this function together with in_toto_record_start as an alternative to
in_toto_run, in order to provide evidence for supply chain steps that cannot
Expand Down Expand Up @@ -876,6 +914,9 @@ def in_toto_record_stop(
"workdir": "<CWD when executing link command>"
}
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -913,14 +954,14 @@ def in_toto_record_stop(
LOG.info("Stop recording '%s'...", step_name)

# Check that we have something to sign and if the formats are right
if not signing_key and not gpg_keyid and not gpg_use_default:
raise ValueError(
"Pass either a signing key, a gpg keyid or set"
" gpg_use_default to True"
)
_require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default)

if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand All @@ -935,7 +976,12 @@ def in_toto_record_stop(

# Load preliminary link file
# If we have a signing key we can use the keyid to construct the name
if signing_key:
if signer:
unfinished_fn = UNFINISHED_FILENAME_FORMAT.format(
step_name=step_name, keyid=signer.public_key.keyid
)

elif signing_key:
unfinished_fn = UNFINISHED_FILENAME_FORMAT.format(
step_name=step_name, keyid=signing_key["keyid"]
)
Expand Down Expand Up @@ -972,7 +1018,13 @@ def in_toto_record_stop(

# The file must have been signed by the same key
# If we have a signing_key we use it for verification as well
if signing_key:
if signer:
LOG.info("Verifying preliminary link signature using passed signer...")
keyid = signer.public_key.keyid
verification_key = signer.public_key.to_dict()
verification_key["keyid"] = keyid

elif signing_key:
LOG.info(
"Verifying preliminary link signature using passed signing key..."
)
Expand Down Expand Up @@ -1042,7 +1094,14 @@ def in_toto_record_stop(
LOG.info("Generating link metadata using DSSE...")
link_metadata = Envelope.from_signable(link)

if signing_key:
if signer:
LOG.info(
"Updating signature with signer '{:.8}...'...".format(
signer.public_key.keyid
)
)

elif signing_key:
LOG.info("Updating signature with key '{:.8}...'...".format(keyid))
signer = SSlibSigner(signing_key)

Expand Down
90 changes: 90 additions & 0 deletions tests/test_runlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import securesystemslib.formats
from securesystemslib.interface import (
generate_and_write_unencrypted_rsa_keypair,
import_ed25519_privatekey_from_file,
import_ed25519_publickey_from_file,
import_rsa_privatekey_from_file,
import_rsa_publickey_from_file,
)
from securesystemslib.signer import CryptoSigner, Signer

import in_toto.exceptions
import in_toto.settings
Expand Down Expand Up @@ -1398,5 +1401,92 @@ def test_check(self):
)


class TestSigner(unittest.TestCase, TmpDirMixin):
"""Test signer argument in runlib API functions (run, record)."""

@classmethod
def setUpClass(cls):
cls.set_up_test_dir() # teardown is called implicitly
keys = Path(__file__).parent / "demo_files"

rsa = "alice"
rsa_priv = import_rsa_privatekey_from_file(str(keys / rsa))
cls.rsa_pub = import_rsa_publickey_from_file(str(keys / f"{rsa}.pub"))
cls.rsa_signer = CryptoSigner.from_securesystemslib_key(rsa_priv)

ed = "danny"
ed_priv = import_ed25519_privatekey_from_file(str(keys / ed))
cls.ed_pub = import_ed25519_publickey_from_file(str(keys / f"{ed}.pub"))
cls.ed_signer = CryptoSigner.from_securesystemslib_key(ed_priv)

def test_run(self):
# Successfully create, sign and verify link
link = in_toto_run("foo", [], [], [], signer=self.rsa_signer)
self.assertIsNone(link.verify_signature(self.rsa_pub))

# Fail with wrong verification key
with self.assertRaises(SignatureVerificationError):
link.verify_signature(self.ed_pub)

# Fail with bad signer arg
class NoSigner:
pass

with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=NoSigner())

# Fail with incompatible signers
class BadSigner(Signer):
"""Signer implementation w/o public_key attribute.
secure-systems-lab/securesystemslib#605
"""

@classmethod
def from_priv_key_uri(
cls,
priv_key_uri,
public_key,
secrets_handler=None,
):
pass

def sign(self, payload):
pass

# Fail with missing public_key attribute.
bad_signer = BadSigner()
with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=bad_signer)

class NoKey:
pass

# Fail with wrong tpe on public_key attribute
bad_signer.public_key = ( # pylint: disable=attribute-defined-outside-init
NoKey()
)
with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=bad_signer)

def test_record(self):
# Successfully create, sign and verify link
in_toto_record_start("bar", [], signer=self.ed_signer)
in_toto_record_stop("bar", [], signer=self.ed_signer)
link_name = FILENAME_FORMAT.format(
step_name="bar", keyid=self.ed_signer.public_key.keyid
)
link = Metablock.load(link_name)
self.assertIsNone(link.verify_signature(self.ed_pub))

# Fail with wrong verification key
with self.assertRaises(SignatureVerificationError):
link.verify_signature(self.rsa_pub)

# Fail with different signer in start and stop
in_toto_record_start("baz", [], signer=self.ed_signer)
with self.assertRaises(OSError):
in_toto_record_stop("baz", [], signer=self.rsa_signer)


if __name__ == "__main__":
unittest.main()

0 comments on commit 1c23378

Please sign in to comment.