diff --git a/in_toto/runlib.py b/in_toto/runlib.py index e547aa72d..6f69e4247 100644 --- a/in_toto/runlib.py +++ b/in_toto/runlib.py @@ -41,7 +41,7 @@ import securesystemslib.formats import securesystemslib.gpg import securesystemslib.hash -from securesystemslib.signer import Signature, SSlibSigner +from securesystemslib.signer import Key, Signature, Signer, SSlibSigner import in_toto.exceptions import in_toto.settings @@ -401,6 +401,26 @@ def _check_match_signing_key(signing_key): ) +def _check_signer(signer): + if not isinstance(signer, Signer): + raise ValueError("signer must be a Signer instance") + + if not ( + hasattr(signer, "public_key") and isinstance(signer.public_key, Key) + ): + # TODO: add `public_key` to `Signer` interface upstream + # see secure-systems-lab/securesystemslib#605 + raise ValueError("only Signer instances with public key supported") + + +def _require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default): + if not any([signer, signing_key, gpg_keyid, gpg_use_default]): + raise ValueError( + "Pass either a signer, a signing key, a gpg keyid or set" + " gpg_use_default to True!" + ) + + def in_toto_run( name, material_list, @@ -420,14 +440,15 @@ def in_toto_run( metadata_directory=None, use_dsse=False, timeout=in_toto.settings.LINK_CMD_EXEC_TIMEOUT, + signer=None, ): """Performs a supply chain step or inspection generating link metadata. Executes link_cmd_args, recording paths and hashes of files before and after command execution (aka. artifacts) in a link metadata file. The metadata is - signed with the passed signing_key, a gpg key identified by its ID, or the - default gpg key. If multiple key arguments are passed, only one key is used - in above order of precedence. The resulting link file is written to + signed with the passed signer, signing_key, a gpg key identified by its ID, or + the default gpg key. If multiple key arguments are passed, only one key is + used in above order of precedence. The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``. If no key argument is passed the link metadata is neither signed nor written to disk. @@ -484,9 +505,12 @@ def in_toto_run( use_dsse (optional): A boolean indicating if DSSE should be used to generate metadata. - timeout (optional): An integer indicating the max timeout in seconds + timeout (optional): An integer indicating the max timeout in seconds for this command. Default is 10 seconds. + signer (optional): A securesystemslib Signer instance used to + sign the resulting link metadata. + Raises: securesystemslib.exceptions.FormatError: Passed arguments are malformed. @@ -524,8 +548,12 @@ def in_toto_run( LOG.info("Running '%s'...", name) # Check key formats to fail early + if signer: + _check_signer(signer) + if signing_key: _check_match_signing_key(signing_key) + if gpg_keyid: securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid) @@ -593,8 +621,10 @@ def in_toto_run( LOG.info("Generating link metadata using Metablock...") link_metadata = Metablock(signed=link, compact_json=compact_json) - signer = None - if signing_key: + if signer: + LOG.info("Signing link metadata using passed signer...") + + elif signing_key: LOG.info("Signing link metadata using passed key...") signer = SSlibSigner(signing_key) @@ -635,12 +665,13 @@ def in_toto_record_start( normalize_line_endings=False, lstrip_paths=None, use_dsse=False, + signer=None, ): """Generates preliminary link metadata. - Records paths and hashes of materials in a preliminary link metadata file. - The metadata is signed with the passed signing_key, a gpg key identified by - its ID, or the default gpg key. If multiple key arguments are passed, only + Records paths and hashes of materials in a preliminary link metadata file. The + metadata is signed with the passed signer, signing_key, a gpg key identified + by its ID, or the default gpg key. If multiple key arguments are passed, only one key is used in above order of precedence. At least one key argument must be passed. The resulting link file is written to ``.STEP-NAME.KEYID-PREFIX.link-unfinished``. @@ -686,6 +717,9 @@ def in_toto_record_start( use_dsse (optional): A boolean indicating if DSSE should be used to generate metadata. + signer (optional): A securesystemslib Signer instance used to + sign the resulting link metadata. + Raises: securesystemslib.exceptions.FormatError: Passed arguments are malformed. @@ -715,20 +749,20 @@ def in_toto_record_start( Writes preliminary link metadata file to disk. """ - # pylint: disable=too-many-locals + # pylint: disable=too-many-locals,too-many-branches LOG.info("Start recording '%s'...", step_name) # Fail if there is no signing key arg at all - if not signing_key and not gpg_keyid and not gpg_use_default: - raise ValueError( - "Pass either a signing key, a gpg keyid or set" - " gpg_use_default to True!" - ) + _require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default) # Check key formats to fail early + if signer: + _check_signer(signer) + if signing_key: _check_match_signing_key(signing_key) + if gpg_keyid: securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid) @@ -771,7 +805,10 @@ def in_toto_record_start( LOG.info("Generating link metadata using Metablock...") link_metadata = Metablock(signed=link) - if signing_key: + if signer: + LOG.info("Signing link metadata using passed signer...") + + elif signing_key: LOG.info("Signing link metadata using passed key...") signer = SSlibSigner(signing_key) @@ -810,16 +847,17 @@ def in_toto_record_stop( command=None, byproducts=None, environment=None, + signer=None, ): """Finalizes preliminary link metadata generated with in_toto_record_start. Loads preliminary link metadata file, verifies its signature, and records paths and hashes as products, thus finalizing the link metadata. The metadata - is signed with the passed signing_key, a gpg key identified by its ID, or the - default gpg key. If multiple key arguments are passed, only one key is used - in above order of precedence. At least one key argument must be passed and it - must be the same as the one used to sign the preliminary link metadata file. - The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``. + is signed with the passed signer, signing_key, a gpg key identified by its ID, + or the default gpg key. If multiple key arguments are passed, only one key is + used in above order of precedence. At least one key argument must be passed + and it must be the same as the one used to sign the preliminary link metadata + file. The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``. Use this function together with in_toto_record_start as an alternative to in_toto_run, in order to provide evidence for supply chain steps that cannot @@ -876,6 +914,9 @@ def in_toto_record_stop( "workdir": "" } + signer (optional): A securesystemslib Signer instance used to + sign the resulting link metadata. + Raises: securesystemslib.exceptions.FormatError: Passed arguments are malformed. @@ -913,14 +954,14 @@ def in_toto_record_stop( LOG.info("Stop recording '%s'...", step_name) # Check that we have something to sign and if the formats are right - if not signing_key and not gpg_keyid and not gpg_use_default: - raise ValueError( - "Pass either a signing key, a gpg keyid or set" - " gpg_use_default to True" - ) + _require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default) + + if signer: + _check_signer(signer) if signing_key: _check_match_signing_key(signing_key) + if gpg_keyid: securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid) @@ -935,7 +976,12 @@ def in_toto_record_stop( # Load preliminary link file # If we have a signing key we can use the keyid to construct the name - if signing_key: + if signer: + unfinished_fn = UNFINISHED_FILENAME_FORMAT.format( + step_name=step_name, keyid=signer.public_key.keyid + ) + + elif signing_key: unfinished_fn = UNFINISHED_FILENAME_FORMAT.format( step_name=step_name, keyid=signing_key["keyid"] ) @@ -972,7 +1018,13 @@ def in_toto_record_stop( # The file must have been signed by the same key # If we have a signing_key we use it for verification as well - if signing_key: + if signer: + LOG.info("Verifying preliminary link signature using passed signer...") + keyid = signer.public_key.keyid + verification_key = signer.public_key.to_dict() + verification_key["keyid"] = keyid + + elif signing_key: LOG.info( "Verifying preliminary link signature using passed signing key..." ) @@ -1042,7 +1094,14 @@ def in_toto_record_stop( LOG.info("Generating link metadata using DSSE...") link_metadata = Envelope.from_signable(link) - if signing_key: + if signer: + LOG.info( + "Updating signature with signer '{:.8}...'...".format( + signer.public_key.keyid + ) + ) + + elif signing_key: LOG.info("Updating signature with key '{:.8}...'...".format(keyid)) signer = SSlibSigner(signing_key) diff --git a/tests/test_runlib.py b/tests/test_runlib.py index 4499988e1..ceb81dc23 100755 --- a/tests/test_runlib.py +++ b/tests/test_runlib.py @@ -36,9 +36,12 @@ import securesystemslib.formats from securesystemslib.interface import ( generate_and_write_unencrypted_rsa_keypair, + import_ed25519_privatekey_from_file, + import_ed25519_publickey_from_file, import_rsa_privatekey_from_file, import_rsa_publickey_from_file, ) +from securesystemslib.signer import CryptoSigner, Signer import in_toto.exceptions import in_toto.settings @@ -1398,5 +1401,92 @@ def test_check(self): ) +class TestSigner(unittest.TestCase, TmpDirMixin): + """Test signer argument in runlib API functions (run, record).""" + + @classmethod + def setUpClass(cls): + cls.set_up_test_dir() # teardown is called implicitly + keys = Path(__file__).parent / "demo_files" + + rsa = "alice" + rsa_priv = import_rsa_privatekey_from_file(str(keys / rsa)) + cls.rsa_pub = import_rsa_publickey_from_file(str(keys / f"{rsa}.pub")) + cls.rsa_signer = CryptoSigner.from_securesystemslib_key(rsa_priv) + + ed = "danny" + ed_priv = import_ed25519_privatekey_from_file(str(keys / ed)) + cls.ed_pub = import_ed25519_publickey_from_file(str(keys / f"{ed}.pub")) + cls.ed_signer = CryptoSigner.from_securesystemslib_key(ed_priv) + + def test_run(self): + # Successfully create, sign and verify link + link = in_toto_run("foo", [], [], [], signer=self.rsa_signer) + self.assertIsNone(link.verify_signature(self.rsa_pub)) + + # Fail with wrong verification key + with self.assertRaises(SignatureVerificationError): + link.verify_signature(self.ed_pub) + + # Fail with bad signer arg + class NoSigner: + pass + + with self.assertRaises(ValueError): + link = in_toto_run("foo", [], [], [], signer=NoSigner()) + + # Fail with incompatible signers + class BadSigner(Signer): + """Signer implementation w/o public_key attribute. + secure-systems-lab/securesystemslib#605 + """ + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri, + public_key, + secrets_handler=None, + ): + pass + + def sign(self, payload): + pass + + # Fail with missing public_key attribute. + bad_signer = BadSigner() + with self.assertRaises(ValueError): + link = in_toto_run("foo", [], [], [], signer=bad_signer) + + class NoKey: + pass + + # Fail with wrong tpe on public_key attribute + bad_signer.public_key = ( # pylint: disable=attribute-defined-outside-init + NoKey() + ) + with self.assertRaises(ValueError): + link = in_toto_run("foo", [], [], [], signer=bad_signer) + + def test_record(self): + # Successfully create, sign and verify link + in_toto_record_start("bar", [], signer=self.ed_signer) + in_toto_record_stop("bar", [], signer=self.ed_signer) + link_name = FILENAME_FORMAT.format( + step_name="bar", keyid=self.ed_signer.public_key.keyid + ) + link = Metablock.load(link_name) + self.assertIsNone(link.verify_signature(self.ed_pub)) + + # Fail with wrong verification key + with self.assertRaises(SignatureVerificationError): + link.verify_signature(self.rsa_pub) + + # Fail with different signer in start and stop + in_toto_record_start("baz", [], signer=self.ed_signer) + with self.assertRaises(OSError): + in_toto_record_stop("baz", [], signer=self.rsa_signer) + + if __name__ == "__main__": unittest.main()