From 491902cfb60c5449840a7f3feae3849450184ff1 Mon Sep 17 00:00:00 2001 From: Rui Hirokawa Date: Fri, 3 Jan 2025 11:35:05 +0900 Subject: [PATCH] - unified public key loader for OSNMA/QZNMA - fixed OSNMA DSM-PKR parser --- src/cssrlib/osnma.py | 81 ++++++++++++++++++++++---------------------- src/cssrlib/qznma.py | 41 ++++++++++++++++++---- 2 files changed, 75 insertions(+), 47 deletions(-) diff --git a/src/cssrlib/osnma.py b/src/cssrlib/osnma.py index 0b2b707..9d98d41 100644 --- a/src/cssrlib/osnma.py +++ b/src/cssrlib/osnma.py @@ -14,15 +14,15 @@ import numpy as np import bitstruct.c as bs -from cryptography.hazmat.primitives import hashes, hmac, cmac, serialization +from cryptography.hazmat.primitives import hashes, hmac, cmac from cryptography.hazmat.primitives.ciphers import algorithms -from cryptography.hazmat.primitives.asymmetric import ec, utils +from cryptography.hazmat.primitives.asymmetric import ec from cryptography.exceptions import InvalidSignature from binascii import unhexlify, hexlify from enum import IntEnum import xml.etree.ElementTree as et from cssrlib.gnss import gpst2time, time2gst, copy_buff -from cssrlib.qznma import raw2der +from cssrlib.qznma import raw2der, load_pubkey class uOSNMA(IntEnum): @@ -35,18 +35,6 @@ class uOSNMA(IntEnum): PKR_UPDATED = 32 -class pubkey(): - """ class to store public key """ - pkid = -1 - pk = None - pkt = None - - def __init__(self, pkid, pkt=0, pk=None): - self.pkid = pkid - self.pkt = pkt - self.pk = pk - - class taginfo(): """ class to store tag """ gst_sf = bytearray(4) @@ -153,14 +141,10 @@ class osnma(): subfrm_p = None subfrm = None - # Public Key in PEM received from GSC OSNMA server - # note : EC_PARAMETER section should be removed. - # pubk_path = '../data/OSNMA_PublicKey_20210920133026_s.pem' - pubk_path = None # Merkle tree root (received from GSC OSNMA server) - bdir = '../data/pubkey/osnma/' - mt_path = bdir + 'OSNMA_MerkleTree_20240115100000_newPKID_1.xml' - pk_list = [] + root_mt = -1 + # Public key list + pk_list = {} flg_slowmac = False nsat = 0 @@ -188,7 +172,7 @@ def pubkey_decompress(self, pkt, pnt): return pk def load_mt(self, file): - """ load markov tree from xml file """ + """ load markov tree and public keys from xml file """ mt = et.parse(file) root = mt.getroot() h = root.find('body').find('MerkleTree') @@ -207,7 +191,8 @@ def load_mt(self, file): pkt_ = 0 pk_ = self.pubkey_decompress(pkt_, pnt_) - self.pk_list[pkid_].pk = pk_ + self.pk_list[pkid_] = pk_ + for h_tn in h.findall('TreeNode'): j_ = int(h_tn.find('j').text) i_ = int(h_tn.find('i').text) @@ -216,18 +201,22 @@ def load_mt(self, file): self.root_mt = x_ return True - def __init__(self): + def __init__(self, mt_file=None): self.monlevel = 1 # debug monitor level self.vcnt_min = 1 + self.pubk_bdir = '../data/pubkey/osnma/' + + # 'OSNMA_MerkleTree_20240115100000_newPKID_1.xml' + self.cnt = np.zeros(self.GALMAX, dtype=int) for prn in range(self.GALMAX): self.hk.append(bytearray(15)) self.mack.append(bytearray(60)) - self.pk_list = [] - for k in range(16): - self.pk_list.append(pubkey(k)) - self.load_mt(self.mt_path) + self.pk_list = {} + + if mt_file is not None: + self.load_mt(self.pubk_bdir + mt_file) self.flg_dsm = {} @@ -258,6 +247,14 @@ def set_gst_sf(self, gst_wn, gst_tow): gst_sf = bs.pack('u12u20', gst_wn, gst_tow//30*30) return gst_sf + def load_pubkey_pkid(self, pubk_path, pkid): + """ load public key from file in dem/crt/pem format """ + pk = load_pubkey(self.pubk_bdir+pubk_path) + if pk is None: + return False + self.pk_list[self.pkid] = pk + return True + def verify_root_key(self): """ verify root key """ did = self.did0 @@ -269,12 +266,10 @@ def verify_root_key(self): result = False hash_func = self.hash_table[self.hf] ds_der = raw2der(self.ds) - if self.pubk_path is None: - pk = self.pk_list[self.pkid].pk - else: - with open(self.pubk_path) as f: - pubk = f.read() - pk = serialization.load_pem_public_key(pubk.encode()) + if self.pkid not in self.pk_list.keys(): + return False + pk = self.pk_list[self.pkid] + try: pk.verify(ds_der, bytes(msg), ec.ECDSA(hash_func())) result = True @@ -369,14 +364,14 @@ def decode_dsm_kroot(self, did): return True def decode_dsm_pkr(self, did): - """ decode DSM-PKR """ + """ decode and verify DSM-PKR """ nb, mid = bs.unpack_from('u4u4', self.dsm[did], 0) if nb < 7 or nb > 10: return False itn = self.dsm[did][1:1+128] # 32*4 npkt, npkid = bs.unpack_from('u4u4', self.dsm[did], 1024+8) # new public key type 1:ECDSA P-256, 3: ECDSA P-521, 4: OAM - if npkt > 4 or npkt == 0: + if npkt > 4 or npkt == 0 or npkt == 2: return False l_dp = (nb+6)*104 if npkt == 4: @@ -384,16 +379,19 @@ def decode_dsm_pkr(self, did): else: l_npk = self.npk_len_t[npkt] i0 = 130+l_npk//8 - npk = self.dsm[130:i0] + npk = self.dsm[did][130:i0] l_pdp = l_dp - 1040 - l_npk if l_pdp < 0: return False p_dp = self.dsm[did][i0:i0+l_pdp//8] m0 = bytearray([self.dsm[did][129]])+npk # NPKT||NPKID||NPK - if not self.verify_pdp(m0, p_dp): # verify P_DP + + # A7.3 Verification of the PDP + if not self.verify_pdp(m0, p_dp): return False + # A7.2 DSM-PKR Verification h = self.process_hash(m0) for k in range(4): itn_b = itn[k*32:(k+1)*32] @@ -405,10 +403,13 @@ def decode_dsm_pkr(self, did): mid >>= 1 result = (h == self.root_mt) + if not result: + return False + self.npkid = npkid if result: pk_ = self.pubkey_decompress(npkt, npk) - self.pk_list[npkid].pk = pk_ + self.pk_list[npkid] = pk_ self.status |= uOSNMA.PKR_UPDATED # PKR updated return result diff --git a/src/cssrlib/qznma.py b/src/cssrlib/qznma.py index d7b4eea..a6dc49e 100644 --- a/src/cssrlib/qznma.py +++ b/src/cssrlib/qznma.py @@ -18,6 +18,7 @@ from cssrlib.gnss import uGNSS, prn2sat, sat2prn, copy_buff from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, utils +from cryptography.x509 import load_pem_x509_certificate from cryptography.exceptions import InvalidSignature from enum import IntEnum import copy @@ -40,6 +41,12 @@ class uNavId(IntEnum): GAL_INAV = 5 +class uCert(IntEnum): + X509_CRT = 1 + PEM = 2 + DER = 3 + + class NavMsg(): """ class to store the navigation message """ sys = 0 @@ -85,12 +92,29 @@ def __init__(self, keyid, ds, salt, mt=0, nid=0, rtow=0, svid=0, self.salt = salt -def load_pubkey(keyid, bdir='../data/pubkey/qznma'): - """ load public key information in der format """ - pubk_path = f"{bdir}/{keyid:03d}.der" - with open(pubk_path, 'rb') as f: +def load_pubkey(pubk_path): + """ load public key information in crt/pem/der format """ + ext = pubk_path.split('.')[-1] + if ext == 'crt': + pk_fmt = uCert.X509_CRT + mode = 'rt' + elif ext == 'pem': + pk_fmt = uCert.PEM + mode = 'rt' + elif ext == 'der': + pk_fmt = uCert.DER + mode = 'rb' + else: + return None + + with open(pubk_path, mode) as f: pubk = f.read() - pk = serialization.load_der_public_key(pubk) + if pk_fmt == uCert.X509_CRT: + pk = load_pem_x509_certificate(pubk.encode()).public_key() + elif pk_fmt == uCert.PEM: + pk = serialization.load_pem_public_key(pubk.encode()) + elif pk_fmt == uCert.DER: + pk = serialization.load_der_public_key(pubk) return pk @@ -131,6 +155,7 @@ def __init__(self): self.cnav_mt_t = {10: 1, 11: 2, 30: 3, 31: 3, 32: 3, 33: 3, 35: 3, 36: 3, 37: 3, 61: 3} + self.pubk_bdir = '../data/pubkey/qznma' def load_navmsg_lnav(self, navfile): """ load GPS/QZSS LNAV navigation messages """ @@ -545,7 +570,8 @@ def verify_gnss_nav(self, npr, mnav): sys, prn = sat2prn(sat) if self.pk is None: - self.pk = load_pubkey(npr.keyid) + pubk_path = self.pubk_bdir + f"/{npr.keyid:03d}.der" + self.pk = load_pubkey(pubk_path) ds_der = raw2der(npr.ds) status = False @@ -734,7 +760,8 @@ def verify_qzss_nav(self, sat, npr, msg, mode): bs.pack_into('u16', rand_, 8+mlen, npr.salt) if self.pk is None: - self.pk = load_pubkey(npr.keyid) + pubk_path = self.pubk_bdir + f"/{npr.keyid:03d}.der" + self.pk = load_pubkey(pubk_path) status = False try: