Skip to content

Commit

Permalink
KMSSigner: get pubkey dict as input
Browse files Browse the repository at this point in the history
This moves some complexity inside the Signer so caller does not need to handle
it. Unfortunately it brings some keydict format handling into KMSSigner code.
  • Loading branch information
jku committed Oct 26, 2022
1 parent 5b7d2de commit 2f3fc8f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
46 changes: 35 additions & 11 deletions securesystemslib/kms_signer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""KMS Signer implementations"""

import logging
from typing import Any, Dict

import securesystemslib.hash as sslib_hash
from securesystemslib import exceptions
from securesystemslib import exceptions, formats
from securesystemslib.signer import Signature, Signer

logger = logging.getLogger(__name__)
Expand All @@ -24,23 +25,48 @@ class KMSSigner(Signer): # pylint: disable=W0223
"""

@staticmethod
def from_keyref(keyref: str, keyid: str, hash_algo: str):
def from_keyref(keyref: str, public_key: Dict[str, Any]):
"""Returns a concrete KMSSigner implementation
Args:
keyref: A KMS-specific URI that identifies the signing key
keyid: Used in the Signature objects returned by sign()
hash_algo: the hash algorithm to use
public_key: Public key metadata conforming to PUBLIC_KEY_SCHEMA
"""

formats.PUBLIC_KEY_SCHEMA.check_match(public_key)

if keyref.startswith(GCPSigner.KEYREF_SCHEME):
if not GCP:
raise exceptions.UnsupportedLibraryError(
f"google-cloud-kms library required for KMS key {keyref} for keyid {keyid}",
f"google-cloud-kms library required for KMS key {keyref}",
)
return GCPSigner(keyref, keyid, hash_algo)
return GCPSigner(keyref, public_key)

raise ValueError(f"Unrecognized keyref {keyref}")

@staticmethod
def _hash_algorithm_for_public_key(key: Dict[str, Any]) -> str:
"""Returns the payload hash algorithm for given public key dict"""

if key["keytype"] == "rsa":
# hash algorithm is encoded as last scheme portion
return key["scheme"].split("-")[-1]
if key["keytype"] in [
"ecdsa",
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
]:
# nistp256 uses sha-256, nistp384 uses sha-384
bits = key["scheme"].split("-nistp")[-1]
return f"sha{bits}"
if key["keytype"] == "ed25519" and key["scheme"] == "ed25519":
# specification default is sha-512
return "sha512"

raise exceptions.UnsupportedAlgorithmError(
f"Failed to select payload hash algorithm for key {key}"
)


class GCPSigner(KMSSigner):
"""Google Cloud KMS Signer
Expand All @@ -56,12 +82,10 @@ class GCPSigner(KMSSigner):

KEYREF_SCHEME = "gcpkms://"

def __init__(self, keyref: str, keyid, hash_algo: str):
# TODO instead of hash_algo and keyid, could just take the public key dict
# as input and figure out the algorithm here?
def __init__(self, keyref: str, public_key: Dict[str, Any]):
self.gcp_keyid = keyref[len(self.KEYREF_SCHEME) :]
self.hash_algo = hash_algo
self.keyid = keyid
self.hash_algo = self._hash_algorithm_for_public_key(public_key)
self.keyid = public_key["keyid"]
self.client = kms.KeyManagementServiceClient()

def sign(self, payload: bytes) -> Signature:
Expand Down
25 changes: 22 additions & 3 deletions test-signer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
from securesystemslib import keys
from securesystemslib.kms_signer import KMSSigner

data = "data".encode("utf-8")

pubkey = {
"keyid": "abcd",
"keytype": "ecdsa",
"scheme": "ecdsa-sha2-nistp256",
"keyval": {
"public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEDJchWswdXOBpMqXkekAzwuWjL+Hx\ncw2ZonDbixh/wTf1FkpxmT8Aq6/WN6NNXOW4Rw9Lua2aKLZo2ZeNrk2VLA==\n-----END PUBLIC KEY-----\n"
},
}

keyref = "gcpkms://projects/openssf/locations/global/keyRings/securesystemslib-test-keyring/cryptoKeys/securesystemslib-test-key/cryptoKeyVersions/1"

signer = KMSSigner.from_keyref(keyref, "abcd", "sha256")
sig = signer.sign("data".encode("utf-8"))
print(sig.to_dict())

signer = KMSSigner.from_keyref(keyref, pubkey)
sig = signer.sign(data)

if not keys.verify_signature(pubkey, sig.to_dict(), data):
raise RuntimeError(
f"Failed to verify signature by {pubkey.keyid} ({keyref}): sig was {sig.to_dict()}"
)
print("OK")

0 comments on commit 2f3fc8f

Please sign in to comment.