Skip to content

Commit

Permalink
Initial version of xmldsig-client
Browse files Browse the repository at this point in the history
TODO: Add unit tests for XMLVerifier
  • Loading branch information
RJPercival committed Jun 3, 2021
1 parent 8503a77 commit c244ecb
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 2 deletions.
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,18 @@
# xmldsig-client
A command-line tool for sending XML messages with digital signatures.
# XML Digital Signature client

This is a bare-bones command-line tool for signing XML according to the
[XML Digital Signature standard](https://www.w3.org/TR/xmldsig-core1/) and sending it, via HTTP POST request, to a
server.

## Usage

Execute `xmldsig-client` and provide the XML to sign via Standard Input, e.g.

```shell
$ xmldsig-client --schema my_schema.xsd --cert cert.pem --key private.pem --url http://localhost/test < my_data.xml
```

The `xmldsig-client` tool requires `--cert` and `--key` arguments, which should be the certificate and private key in
PEM format. It also accepts:
- `--schema`: Validate the provided XML against this schema
- `--url`: POST the signed XML to this URL
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
cryptography
pyOpenSSL < 20
requests
signxml
xmlschema
33 changes: 33 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import setuptools

setuptools.setup(
name="xmldsig-client",
description="A command-line tool for sending XML messages with digital signatures.",
author="Kraken Technologies Limited",
author_email="[email protected]",
url="https://github.com/octoenergy/xmldsig-client",
packages=setuptools.find_packages("src"),
package_dir={"": "src"},
install_requires=[
"cryptography",
"pyOpenSSL",
"requests",
"signxml",
"xmlschema",
],
entry_points={
"console_scripts": [
"xmldsig-client = xmldsig.__main__:main",
],
},
version_config=True,
setup_requires=["setuptools-git-versioning"],
classifiers=[
"Environment :: Console",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Topic :: Software Development :: Testing :: Traffic Generation",
"Topic :: Text Processing :: Markup :: XML",
],
)
Empty file added src/xmldsig/__init__.py
Empty file.
92 changes: 92 additions & 0 deletions src/xmldsig/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import argparse
import sys
from typing import BinaryIO

import requests
import xmlschema
from cryptography.hazmat import backends as crypto_backends
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto

from . import signatures

arg_parser = argparse.ArgumentParser(prog="xmldsig-client", description="Send a signed XML message")
arg_parser.add_argument(
"--key",
type=argparse.FileType("rb"),
required=True,
help="Private key that should be used to sign the message (in PEM format)",
)
arg_parser.add_argument(
"--cert",
type=argparse.FileType("rb"),
required=True,
help="Certificate that can be used to verify the signature on the message (in PEM format)",
)
arg_parser.add_argument(
"--schema",
type=argparse.FileType("r"),
required=False,
help="XML schema (XSD) to use to verify the message format before sending it",
)
arg_parser.add_argument(
"--url",
type=str,
required=False,
help="URL to which the message will be sent. If omitted, the message will be printed to stdout.",
)


def load_private_key(file: BinaryIO):
return serialization.load_pem_private_key(
data=file.read(), password=None, backend=crypto_backends.default_backend()
)


def load_cert(file: BinaryIO):
return crypto.load_certificate(type=crypto.FILETYPE_PEM, buffer=file.read())


def main():
# Command-line arguments
args = arg_parser.parse_args()

# Read XML from stdin and validate
xml = sys.stdin.read()
if not xml:
sys.exit("Please write the XML message to send")

if args.schema:
print("Validating input against schema...")
schema = xmlschema.XMLSchema(args.schema)
try:
schema.validate(xml)
except xmlschema.XMLSchemaValidationError as e:
sys.exit(str(e))

print("Generating XML signature...")
signer = signatures.XMLSigner(
key=load_private_key(args.key), cert=load_cert(args.cert)
)
signed_xml = signer.sign(xml.encode("utf-8"))

if args.url:
print(f"Sending to {args.url}...")

try:
response = requests.post(
url=args.url,
data=signed_xml,
allow_redirects=True,
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
sys.exit(str(e))

print(f"Request sent to {args.url} successfully")
else:
print(signed_xml.decode("utf-8"))


if __name__ == "__main__":
main()
171 changes: 171 additions & 0 deletions src/xmldsig/signatures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import base64

import signxml
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from lxml import etree
from OpenSSL import crypto

XML_NAMESPACES = {
"ds": "http://www.w3.org/2000/09/xmldsig#",
"dsig11": "http://www.w3.org/2009/xmldsig11#",
}
"""
XML namespaces used by signatures.
"""


class XMLError(ValueError):
"""
Some signed XML is malformed or invalid in some way.
"""

pass


class SignatureError(ValueError):
"""
An XML signature is invalid or malformed.
"""

pass


class CertificateNotFoundError(ValueError):
def __init__(self, issuer: x509.Name, serial_number: int):
super().__init__("Certificate not found")
self.issuer = issuer
self.serial_number = serial_number


class XMLSigner:
"""
Signs XML messages using a private key, according to https://www.w3.org/TR/xmldsig-core1/.
"""

SIGNATURE_ALGORITHM_ID = "ecdsa-sha256"
"""
Algorithm used to generate request signatures.
"""

DIGEST_ALGORITHM_ID = "sha256"
"""
Algorithm used to hash request XML before signing.
"""

CANONICALIZATION_ALGORITHM_ID = "http://www.w3.org/2001/10/xml-exc-c14n#"
"""
Algorithm used to transform request XML into a canonical form before hashing.
"""

include_issuer_serial: bool = True
include_x509_digest: bool = True

def __init__(self, key: ec.EllipticCurvePrivateKey, cert: crypto.X509):
"""
:param key: The private key to use for generating signatures.
:param cert: The certificate that contains the corresponding public key, used for verification.
"""
self.signer = signxml.XMLSigner(
method=signxml.methods.enveloped,
signature_algorithm=self.SIGNATURE_ALGORITHM_ID,
digest_algorithm=self.DIGEST_ALGORITHM_ID,
c14n_algorithm=self.CANONICALIZATION_ALGORITHM_ID,
)
if not key:
raise ValueError("Must provide private key for generating signatures")
self.key = key
if not cert:
raise ValueError(
"Must provide certificate used to verify generated signatures"
)
self.cert = cert

def sign(self, xml: bytes) -> bytes:
"""
Adds a Signature element to some XML.
:param xml: The XML to sign.
:return: The XML with an added Signature element.
:raise XMLError: if XML is invalid
:raise SignatureError: if signature is invalid
"""
try:
parsed_xml = etree.fromstring(xml)
except ValueError as e:
raise XMLError() from e

signed_xml: etree.ElementBase = self.signer.sign(
data=parsed_xml, key=self.key, cert=[self.cert]
)
# TODO(RJPercival): Add support for X509IssuerSerial to signxml library.
if self.include_issuer_serial:
try:
self._add_issuer_serial(signed_xml=signed_xml)
except XMLError as e:
raise SignatureError(
"Generated signature did not have expected format"
) from e

if self.include_x509_digest:
try:
self._add_digest(signed_xml)
except XMLError as e:
raise SignatureError(
"Generated signature did not have expected format"
) from e

return etree.tostring(signed_xml, encoding="utf-8")

def _add_issuer_serial(self, signed_xml: etree.ElementBase) -> None:
"""
Add X509IssuerSerial element to request signature.
Note that this element is deprecated; prefer X509Digest when possible.
"""
x509_data = _get_element(signed_xml, "./ds:Signature/ds:KeyInfo/ds:X509Data")

if x509_data.xpath("./ds:X509IssuerSerial", namespaces=XML_NAMESPACES):
# Already has an X509IssuerSerial element - nothing to do.
return

issuer_serial: etree.ElementBase = etree.SubElement(
x509_data, "{%s}X509IssuerSerial" % XML_NAMESPACES["ds"]
)

issuer_name: etree.ElementBase = etree.SubElement(
issuer_serial, "{%s}X509IssuerName" % XML_NAMESPACES["ds"]
)
issuer_name.text = b",".join(
b"%s=%s" % (key, value)
for key, value in reversed(self.cert.get_issuer().get_components())
).decode("utf-8")

serial_number: etree.ElementBase = etree.SubElement(
issuer_serial, "{%s}X509SerialNumber" % XML_NAMESPACES["ds"]
)
serial_number.text = str(self.cert.get_serial_number())

def _add_digest(self, signed_xml: etree.ElementBase) -> None:
"""
Add X509Digest element to the request signature.
"""
x509_data = _get_element(signed_xml, "./ds:Signature/ds:KeyInfo/ds:X509Data")
if x509_data.xpath("./dsig11:X509Digest", namespaces=XML_NAMESPACES):
# Already has an X509Digest element - nothing to do.
return

digest: etree.ElementBase = etree.SubElement(
x509_data, "{%s}X509Digest" % XML_NAMESPACES["dsig11"]
)

digest.set("Algorithm", "http://www.w3.org/2001/04/xmlenc#sha256")
digest.text = base64.b64encode(
self.cert.to_cryptography().fingerprint(hashes.SHA256())
)


def _get_element(root_element: etree.ElementTree, xpath: str) -> etree.Element:
elements = root_element.xpath(xpath, namespaces=XML_NAMESPACES)
if (count := len(elements)) != 1:
raise XMLError(f"Expected 1 element matching '{xpath}', found {count}")
return elements[0]
Loading

0 comments on commit c244ecb

Please sign in to comment.