Skip to content

Commit

Permalink
Merge pull request #526 from jku/hsm-identification
Browse files Browse the repository at this point in the history
Hsm identification
  • Loading branch information
jku authored Mar 7, 2023
2 parents 00d221c + 23fa243 commit 1695e45
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 28 deletions.
135 changes: 110 additions & 25 deletions securesystemslib/signer/_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
import binascii
from contextlib import contextmanager
from typing import Optional, Tuple
from typing import Dict, Iterator, List, Optional, Tuple
from urllib import parse

from securesystemslib import KEY_TYPE_ECDSA
Expand Down Expand Up @@ -83,26 +83,35 @@ class HSMSigner(Signer):
Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384".
HSMSigner uses the first token it finds, if multiple tokens are available. They can
be instantiated with Signer.from_priv_key_uri(). These private key URI schemes are
supported:
HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage
example below.
The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and
FILTERS are optional. Example URIs:
* "hsm:":
Sign with key on PIV digital signature slot 9c.
Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the
only token/smartcard available.
* "hsm:2?label=YubiKey+PIV+%2315835999":
Sign with key with keyid 2 (PIV slot 9c) on a token with label
"YubiKey+PIV+%2315835999"
Usage::
# sign with PIV slot 9c, verify with existing public key
# Store public key and URI for your HSM device for later use. By default
# slot 9c is selected.
uri, pubkey = HSMSigner.import_()
# later, use the uri and pubkey to sign
def pin_handler(secret: str) -> str:
return getpass(f"Enter {secret}: ")
signer = Signer.from_priv_key_uri("hsm:", public_key, pin_handler)
signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler)
sig = signer.sign(b"DATA")
public_key.verify_signature(sig, b"DATA")
pubkey.verify_signature(sig, b"DATA")
Arguments:
hsm_keyid: Key identifier on the token.
token_filter: Dictionary of token field names and values
public_key: The related public key instance.
pin_handler: A function that returns the HSM user login pin, needed for
signing. It receives the string argument "pin".
Expand All @@ -120,7 +129,11 @@ def pin_handler(secret: str) -> str:
SECRETS_HANDLER_MSG = "pin"

def __init__(
self, hsm_keyid: int, public_key: Key, pin_handler: SecretsHandler
self,
hsm_keyid: int,
token_filter: Dict[str, str],
public_key: Key,
pin_handler: SecretsHandler,
):
if CRYPTO_IMPORT_ERROR:
raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
Expand All @@ -133,22 +146,54 @@ def __init__(

self._mechanism = _MECHANISM_FOR_SCHEME[public_key.scheme]
self.hsm_keyid = hsm_keyid
self.token_filter = token_filter
self.public_key = public_key
self.pin_handler = pin_handler

@staticmethod
@contextmanager
def _default_session():
"""Context manager to handle default HSM session on reader slot 1."""
def _find_pkcs_slot(filters: Dict[str, str]) -> int:
"""Return the PKCS slot with initialized token that matches filter
Raises ValueError if more or less than 1 PKCS slot is found.
"""
lib = PYKCS11LIB()
slots = lib.getSlotList(tokenPresent=True)
if not slots:
raise ValueError("could not find token")
slots: List[int] = []
for slot in lib.getSlotList(tokenPresent=True):
tokeninfo = lib.getTokenInfo(slot)
if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED:
# useful for tests (softhsm always has an unitialized token)
continue

match = True
# all values in filters must match token fields
for key, value in filters.items():
tokenvalue: str = getattr(tokeninfo, key, "").strip()
if tokenvalue != value:
match = False

if match:
slots.append(slot)

if len(slots) != 1:
raise ValueError(
f"Found {len(slots)} cryptographic tokens matching filter {filters}"
)

session = lib.openSession(slots[0])
return slots[0]

@staticmethod
@contextmanager
def _get_session(filters: Dict[str, str]) -> Iterator["PyKCS11.Session"]:
"""Context manager to handle a HSM session.
The cryptographic token is selected by filtering by token info fields.
ValueError is raised if matching token is not found, or if more
than one are found.
"""
slot = HSMSigner._find_pkcs_slot(filters)
session = PYKCS11LIB().openSession(slot)
try:
yield session

finally:
session.closeSession()

Expand Down Expand Up @@ -192,22 +237,54 @@ def _find_key_values(
return ECDomainParameters.load(bytes(params)), bytes(point)

@classmethod
def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
def _build_token_filter(cls) -> Dict[str, str]:
"""Builds a token filter for the found cryptographic token.
The filter will include 'label' if one is found on token.
raises ValueError if less or more than 1 token is found
"""

lib = PYKCS11LIB()
slot = cls._find_pkcs_slot({})
tokeninfo = lib.getTokenInfo(slot)

filters = {}
# other possible fields include manufacturerID, model and serialNumber
for key in ["label"]:
try:
filters[key] = getattr(tokeninfo, key).strip()
except AttributeError:
pass

return filters

@classmethod
def import_(
cls,
hsm_keyid: Optional[int] = None,
token_filter: Optional[Dict[str, str]] = None,
) -> Tuple[str, SSlibKey]:
"""Import public key and signer details from HSM.
Either only one cryptographic token must be present when importing or a
token_filter that matches a single token must be provided.
Returns a private key URI (for Signer.from_priv_key_uri()) and a public
key. import_() should be called once and the returned URI and public
key should be stored for later use.
Arguments:
hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c).
token_filter: Dictionary of token field names and values used to
filter the correct cryptographic token. If no filter is
provided one is built from the fields found on the token.
Raises:
UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto``
libraries not found.
ValueError: No compatible key for ``hsm_keyid`` found on HSM.
ValueError: A matching HSM device could not be found.
PyKCS11.PyKCS11Error: Various HSM communication errors.
"""
if CRYPTO_IMPORT_ERROR:
raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
Expand All @@ -221,7 +298,12 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
if hsm_keyid is None:
hsm_keyid = cls.SCHEME_KEYID

with cls._default_session() as session:
if token_filter is None:
token_filter = cls._build_token_filter()

uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}"

with cls._get_session(token_filter) as session:
params, point = cls._find_key_values(session, hsm_keyid)

if params.chosen.native not in _CURVE_NAMES:
Expand All @@ -247,7 +329,7 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval)
key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval)

return "hsm:", key
return uri, key

@classmethod
def from_priv_key_uri(
Expand All @@ -264,10 +346,13 @@ def from_priv_key_uri(
if uri.scheme != cls.SCHEME:
raise ValueError(f"HSMSigner does not support {priv_key_uri}")

keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID
token_filter = dict(parse.parse_qsl(uri.query))

if secrets_handler is None:
raise ValueError("HSMSigner requires a secrets handler")

return cls(cls.SCHEME_KEYID, public_key, secrets_handler)
return cls(keyid, token_filter, public_key, secrets_handler)

def sign(self, payload: bytes) -> Signature:
"""Signs payload with Hardware Security Module (HSM).
Expand All @@ -283,7 +368,7 @@ def sign(self, payload: bytes) -> Signature:
Signature.
"""
pin = self.pin_handler(self.SECRETS_HANDLER_MSG)
with self._default_session() as session:
with self._get_session(self.token_filter) as session:
session.login(pin)
key = self._find_key(
session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY
Expand Down
22 changes: 19 additions & 3 deletions tests/test_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def setUpClass(cls):
slot = lib.getSlotList(tokenPresent=True)[0]
lib.initToken(slot, so_pin, token_label)

tokeninfo = lib.getTokenInfo(slot)
cls.token_filter = {"label": getattr(tokeninfo, "label")}

session = PYKCS11LIB().openSession(slot, PyKCS11.CKF_RW_SESSION)
session.login(so_pin, PyKCS11.CKU_SO)
session.initPin(cls.hsm_user_pin)
Expand All @@ -138,8 +141,10 @@ def test_hsm(self):
"""Test HSM key export and signing."""

for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]:
_, key = HSMSigner.import_(hsm_keyid)
signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin)
_, key = HSMSigner.import_(hsm_keyid, self.token_filter)
signer = HSMSigner(
hsm_keyid, self.token_filter, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")

Expand All @@ -149,7 +154,18 @@ def test_hsm(self):
def test_hsm_uri(self):
"""Test HSM default key export and signing from URI."""

uri, key = HSMSigner.import_(self.hsm_keyid_default)
# default import
uri, key = HSMSigner.import_()
signer = Signer.from_priv_key_uri(
uri, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")
with self.assertRaises(UnverifiedSignatureError):
key.verify_signature(sig, b"NOT DATA")

# Import with specified values
uri, key = HSMSigner.import_(self.hsm_keyid_default, self.token_filter)
signer = Signer.from_priv_key_uri(
uri, key, lambda sec: self.hsm_user_pin
)
Expand Down

0 comments on commit 1695e45

Please sign in to comment.