Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hsm identification #526

Merged
merged 4 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 105 additions & 20 deletions securesystemslib/signer/_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
import binascii
from contextlib import contextmanager
from typing import Optional, Tuple
from typing import Dict, Iterator, List, Optional, Tuple
from urllib import parse

from securesystemslib import KEY_TYPE_ECDSA
Expand Down Expand Up @@ -83,26 +83,34 @@ class HSMSigner(Signer):

Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384".

HSMSigner uses the first token it finds, if multiple tokens are available. They can
be instantiated with Signer.from_priv_key_uri(). These private key URI schemes are
supported:
HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage
example below.

The private key URI scheme is: "hsm:<KEYID>?<FILTERS>" where both KEYID and
FILTERS are optional. Example URIs:
* "hsm:":
Sign with key on PIV digital signature slot 9c.
Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the
only token/smartcard available.
* "hsm:2?label=YubiKey+PIV+%2315835999":
Sign with key with keyid 2 (PIV slot 9c) on a token with label
"YubiKey+PIV+%2315835999"

Usage::
jku marked this conversation as resolved.
Show resolved Hide resolved
# Store public key and URI for your HSM device for later use. By default
# slot 9c is selected.
uri, pubkey = HSMSigner.import_()

# sign with PIV slot 9c, verify with existing public key
# later, use the uri and pubkey to sign
def pin_handler(secret: str) -> str:
return getpass(f"Enter {secret}: ")

signer = Signer.from_priv_key_uri("hsm:", public_key, pin_handler)
signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler)
sig = signer.sign(b"DATA")

public_key.verify_signature(sig, b"DATA")
pubkey.verify_signature(sig, b"DATA")

Arguments:
hsm_keyid: Key identifier on the token.
token_filter: Dictionary of token field names and values
public_key: The related public key instance.
pin_handler: A function that returns the HSM user login pin, needed for
signing. It receives the string argument "pin".
Expand All @@ -120,7 +128,11 @@ def pin_handler(secret: str) -> str:
SECRETS_HANDLER_MSG = "pin"

def __init__(
self, hsm_keyid: int, public_key: Key, pin_handler: SecretsHandler
self,
hsm_keyid: int,
token_filter: Dict[str, str],
public_key: Key,
pin_handler: SecretsHandler,
):
if CRYPTO_IMPORT_ERROR:
raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
Expand All @@ -133,17 +145,41 @@ def __init__(

self._mechanism = _MECHANISM_FOR_SCHEME[public_key.scheme]
self.hsm_keyid = hsm_keyid
self.token_filter = token_filter
self.public_key = public_key
self.pin_handler = pin_handler

@staticmethod
@contextmanager
def _default_session():
"""Context manager to handle default HSM session on reader slot 1."""
def _get_session(filters: Dict[str, str]) -> Iterator["PyKCS11.Session"]:
"""Context manager to handle a HSM session.

The slot/token is selected by filtering by token info fields.
ValueError is raised if not matching slot/token is not found, or if
jku marked this conversation as resolved.
Show resolved Hide resolved
more than one are found.
"""
lib = PYKCS11LIB()
slots = lib.getSlotList(tokenPresent=True)
if not slots:
raise ValueError("could not find token")
slots: List[int] = []
for slot in lib.getSlotList(tokenPresent=True):
tokeninfo = lib.getTokenInfo(slot)
if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED:
# useful for tests (softhsm always has an unitialized token)
continue

match = True
# all values in filters must match token fields
for key, value in filters.items():
tokenvalue: str = getattr(tokeninfo, key, "").strip()
if tokenvalue != value:
match = False

if match:
slots.append(slot)

if len(slots) != 1:
raise ValueError(
f"Found {len(slots)} slots/tokens matching filter {filters}"
)

session = lib.openSession(slots[0])
try:
Expand Down Expand Up @@ -192,15 +228,56 @@ def _find_key_values(
return ECDomainParameters.load(bytes(params)), bytes(point)

@classmethod
def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
def _build_token_filter(cls) -> Dict[str, str]:
"""Builds a token filter for the found slot/token.

The filter will include 'label' if one is found on token.

raises ValueError if less or more than 1 token/slot is found
jku marked this conversation as resolved.
Show resolved Hide resolved
"""

lib = PYKCS11LIB()
slots: List[int] = []
for slot in lib.getSlotList(tokenPresent=True):
tokeninfo = lib.getTokenInfo(slot)
if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED:
# useful for tests (softhsm always has an unitialized token)
continue
slots.append(slot)
jku marked this conversation as resolved.
Show resolved Hide resolved

if len(slots) != 1:
raise ValueError(f"Expected 1 token/slot, found {len(slots)}")
filters = {}
tokeninfo = lib.getTokenInfo(slots[0])
# other possible fields include manufacturerID, model and serialNumber
for key in ["label"]:
try:
filters[key] = getattr(tokeninfo, key).strip()
except AttributeError:
pass

return filters

@classmethod
def import_(
cls,
hsm_keyid: Optional[int] = None,
token_filter: Optional[Dict[str, str]] = None,
) -> Tuple[str, SSlibKey]:
"""Import public key and signer details from HSM.

Either only one token/slot must be present when importing or a
token_filter must be provided.
jku marked this conversation as resolved.
Show resolved Hide resolved

Returns a private key URI (for Signer.from_priv_key_uri()) and a public
key. import_() should be called once and the returned URI and public
key should be stored for later use.

Arguments:
hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c).
token_filter: Dictionary of token field names and values used to
filter the correct slot/token. If no filter is provided one is built
from the fields found on the token.
jku marked this conversation as resolved.
Show resolved Hide resolved
jku marked this conversation as resolved.
Show resolved Hide resolved

Raises:
UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto``
Expand All @@ -221,7 +298,12 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
if hsm_keyid is None:
hsm_keyid = cls.SCHEME_KEYID

with cls._default_session() as session:
if token_filter is None:
token_filter = cls._build_token_filter()

uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}"

with cls._get_session(token_filter) as session:
params, point = cls._find_key_values(session, hsm_keyid)

if params.chosen.native not in _CURVE_NAMES:
Expand All @@ -247,7 +329,7 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval)
key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval)

return "hsm:", key
return uri, key

@classmethod
def from_priv_key_uri(
Expand All @@ -264,10 +346,13 @@ def from_priv_key_uri(
if uri.scheme != cls.SCHEME:
raise ValueError(f"HSMSigner does not support {priv_key_uri}")

keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID
token_filter = dict(parse.parse_qsl(uri.query))

if secrets_handler is None:
raise ValueError("HSMSigner requires a secrets handler")

return cls(cls.SCHEME_KEYID, public_key, secrets_handler)
return cls(keyid, token_filter, public_key, secrets_handler)

def sign(self, payload: bytes) -> Signature:
"""Signs payload with Hardware Security Module (HSM).
Expand All @@ -283,7 +368,7 @@ def sign(self, payload: bytes) -> Signature:
Signature.
"""
pin = self.pin_handler(self.SECRETS_HANDLER_MSG)
with self._default_session() as session:
with self._get_session(self.token_filter) as session:
session.login(pin)
key = self._find_key(
session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY
Expand Down
22 changes: 19 additions & 3 deletions tests/test_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def setUpClass(cls):
slot = lib.getSlotList(tokenPresent=True)[0]
lib.initToken(slot, so_pin, token_label)

tokeninfo = lib.getTokenInfo(slot)
cls.token_filter = {"label": getattr(tokeninfo, "label")}

session = PYKCS11LIB().openSession(slot, PyKCS11.CKF_RW_SESSION)
session.login(so_pin, PyKCS11.CKU_SO)
session.initPin(cls.hsm_user_pin)
Expand All @@ -138,8 +141,10 @@ def test_hsm(self):
"""Test HSM key export and signing."""

for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]:
_, key = HSMSigner.import_(hsm_keyid)
signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin)
_, key = HSMSigner.import_(hsm_keyid, self.token_filter)
signer = HSMSigner(
hsm_keyid, self.token_filter, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")

Expand All @@ -149,7 +154,18 @@ def test_hsm(self):
def test_hsm_uri(self):
"""Test HSM default key export and signing from URI."""

uri, key = HSMSigner.import_(self.hsm_keyid_default)
# default import
uri, key = HSMSigner.import_()
signer = Signer.from_priv_key_uri(
uri, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")
with self.assertRaises(UnverifiedSignatureError):
key.verify_signature(sig, b"NOT DATA")

# Import with specified values
uri, key = HSMSigner.import_(self.hsm_keyid_default, self.token_filter)
signer = Signer.from_priv_key_uri(
uri, key, lambda sec: self.hsm_user_pin
)
Expand Down