Skip to content

Commit

Permalink
- unified public key loader for OSNMA/QZNMA
Browse files Browse the repository at this point in the history
- fixed OSNMA DSM-PKR parser
  • Loading branch information
hirokawa committed Jan 3, 2025
1 parent b740b79 commit 491902c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 47 deletions.
81 changes: 41 additions & 40 deletions src/cssrlib/osnma.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

import numpy as np
import bitstruct.c as bs
from cryptography.hazmat.primitives import hashes, hmac, cmac, serialization
from cryptography.hazmat.primitives import hashes, hmac, cmac
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.exceptions import InvalidSignature
from binascii import unhexlify, hexlify
from enum import IntEnum
import xml.etree.ElementTree as et
from cssrlib.gnss import gpst2time, time2gst, copy_buff
from cssrlib.qznma import raw2der
from cssrlib.qznma import raw2der, load_pubkey


class uOSNMA(IntEnum):
Expand All @@ -35,18 +35,6 @@ class uOSNMA(IntEnum):
PKR_UPDATED = 32


class pubkey():
""" class to store public key """
pkid = -1
pk = None
pkt = None

def __init__(self, pkid, pkt=0, pk=None):
self.pkid = pkid
self.pkt = pkt
self.pk = pk


class taginfo():
""" class to store tag """
gst_sf = bytearray(4)
Expand Down Expand Up @@ -153,14 +141,10 @@ class osnma():
subfrm_p = None
subfrm = None

# Public Key in PEM received from GSC OSNMA server
# note : EC_PARAMETER section should be removed.
# pubk_path = '../data/OSNMA_PublicKey_20210920133026_s.pem'
pubk_path = None
# Merkle tree root (received from GSC OSNMA server)
bdir = '../data/pubkey/osnma/'
mt_path = bdir + 'OSNMA_MerkleTree_20240115100000_newPKID_1.xml'
pk_list = []
root_mt = -1
# Public key list
pk_list = {}

flg_slowmac = False
nsat = 0
Expand Down Expand Up @@ -188,7 +172,7 @@ def pubkey_decompress(self, pkt, pnt):
return pk

def load_mt(self, file):
""" load markov tree from xml file """
""" load markov tree and public keys from xml file """
mt = et.parse(file)
root = mt.getroot()
h = root.find('body').find('MerkleTree')
Expand All @@ -207,7 +191,8 @@ def load_mt(self, file):
pkt_ = 0

pk_ = self.pubkey_decompress(pkt_, pnt_)
self.pk_list[pkid_].pk = pk_
self.pk_list[pkid_] = pk_

for h_tn in h.findall('TreeNode'):
j_ = int(h_tn.find('j').text)
i_ = int(h_tn.find('i').text)
Expand All @@ -216,18 +201,22 @@ def load_mt(self, file):
self.root_mt = x_
return True

def __init__(self):
def __init__(self, mt_file=None):
self.monlevel = 1 # debug monitor level
self.vcnt_min = 1

self.pubk_bdir = '../data/pubkey/osnma/'

# 'OSNMA_MerkleTree_20240115100000_newPKID_1.xml'

self.cnt = np.zeros(self.GALMAX, dtype=int)
for prn in range(self.GALMAX):
self.hk.append(bytearray(15))
self.mack.append(bytearray(60))
self.pk_list = []
for k in range(16):
self.pk_list.append(pubkey(k))
self.load_mt(self.mt_path)
self.pk_list = {}

if mt_file is not None:
self.load_mt(self.pubk_bdir + mt_file)

self.flg_dsm = {}

Expand Down Expand Up @@ -258,6 +247,14 @@ def set_gst_sf(self, gst_wn, gst_tow):
gst_sf = bs.pack('u12u20', gst_wn, gst_tow//30*30)
return gst_sf

def load_pubkey_pkid(self, pubk_path, pkid):
""" load public key from file in dem/crt/pem format """
pk = load_pubkey(self.pubk_bdir+pubk_path)
if pk is None:
return False
self.pk_list[self.pkid] = pk
return True

def verify_root_key(self):
""" verify root key """
did = self.did0
Expand All @@ -269,12 +266,10 @@ def verify_root_key(self):
result = False
hash_func = self.hash_table[self.hf]
ds_der = raw2der(self.ds)
if self.pubk_path is None:
pk = self.pk_list[self.pkid].pk
else:
with open(self.pubk_path) as f:
pubk = f.read()
pk = serialization.load_pem_public_key(pubk.encode())
if self.pkid not in self.pk_list.keys():
return False
pk = self.pk_list[self.pkid]

try:
pk.verify(ds_der, bytes(msg), ec.ECDSA(hash_func()))
result = True
Expand Down Expand Up @@ -369,31 +364,34 @@ def decode_dsm_kroot(self, did):
return True

def decode_dsm_pkr(self, did):
""" decode DSM-PKR """
""" decode and verify DSM-PKR """
nb, mid = bs.unpack_from('u4u4', self.dsm[did], 0)
if nb < 7 or nb > 10:
return False
itn = self.dsm[did][1:1+128] # 32*4
npkt, npkid = bs.unpack_from('u4u4', self.dsm[did], 1024+8)
# new public key type 1:ECDSA P-256, 3: ECDSA P-521, 4: OAM
if npkt > 4 or npkt == 0:
if npkt > 4 or npkt == 0 or npkt == 2:
return False
l_dp = (nb+6)*104
if npkt == 4:
l_npk = 104*(nb+6)-1040
else:
l_npk = self.npk_len_t[npkt]
i0 = 130+l_npk//8
npk = self.dsm[130:i0]
npk = self.dsm[did][130:i0]
l_pdp = l_dp - 1040 - l_npk
if l_pdp < 0:
return False
p_dp = self.dsm[did][i0:i0+l_pdp//8]

m0 = bytearray([self.dsm[did][129]])+npk # NPKT||NPKID||NPK
if not self.verify_pdp(m0, p_dp): # verify P_DP

# A7.3 Verification of the PDP
if not self.verify_pdp(m0, p_dp):
return False

# A7.2 DSM-PKR Verification
h = self.process_hash(m0)
for k in range(4):
itn_b = itn[k*32:(k+1)*32]
Expand All @@ -405,10 +403,13 @@ def decode_dsm_pkr(self, did):
mid >>= 1

result = (h == self.root_mt)
if not result:
return False

self.npkid = npkid
if result:
pk_ = self.pubkey_decompress(npkt, npk)
self.pk_list[npkid].pk = pk_
self.pk_list[npkid] = pk_
self.status |= uOSNMA.PKR_UPDATED # PKR updated
return result

Expand Down
41 changes: 34 additions & 7 deletions src/cssrlib/qznma.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cssrlib.gnss import uGNSS, prn2sat, sat2prn, copy_buff
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.x509 import load_pem_x509_certificate
from cryptography.exceptions import InvalidSignature
from enum import IntEnum
import copy
Expand All @@ -40,6 +41,12 @@ class uNavId(IntEnum):
GAL_INAV = 5


class uCert(IntEnum):
X509_CRT = 1
PEM = 2
DER = 3


class NavMsg():
""" class to store the navigation message """
sys = 0
Expand Down Expand Up @@ -85,12 +92,29 @@ def __init__(self, keyid, ds, salt, mt=0, nid=0, rtow=0, svid=0,
self.salt = salt


def load_pubkey(keyid, bdir='../data/pubkey/qznma'):
""" load public key information in der format """
pubk_path = f"{bdir}/{keyid:03d}.der"
with open(pubk_path, 'rb') as f:
def load_pubkey(pubk_path):
""" load public key information in crt/pem/der format """
ext = pubk_path.split('.')[-1]
if ext == 'crt':
pk_fmt = uCert.X509_CRT
mode = 'rt'
elif ext == 'pem':
pk_fmt = uCert.PEM
mode = 'rt'
elif ext == 'der':
pk_fmt = uCert.DER
mode = 'rb'
else:
return None

with open(pubk_path, mode) as f:
pubk = f.read()
pk = serialization.load_der_public_key(pubk)
if pk_fmt == uCert.X509_CRT:
pk = load_pem_x509_certificate(pubk.encode()).public_key()
elif pk_fmt == uCert.PEM:
pk = serialization.load_pem_public_key(pubk.encode())
elif pk_fmt == uCert.DER:
pk = serialization.load_der_public_key(pubk)

return pk

Expand Down Expand Up @@ -131,6 +155,7 @@ def __init__(self):

self.cnav_mt_t = {10: 1, 11: 2, 30: 3, 31: 3, 32: 3, 33: 3,
35: 3, 36: 3, 37: 3, 61: 3}
self.pubk_bdir = '../data/pubkey/qznma'

def load_navmsg_lnav(self, navfile):
""" load GPS/QZSS LNAV navigation messages """
Expand Down Expand Up @@ -545,7 +570,8 @@ def verify_gnss_nav(self, npr, mnav):
sys, prn = sat2prn(sat)

if self.pk is None:
self.pk = load_pubkey(npr.keyid)
pubk_path = self.pubk_bdir + f"/{npr.keyid:03d}.der"
self.pk = load_pubkey(pubk_path)

ds_der = raw2der(npr.ds)
status = False
Expand Down Expand Up @@ -734,7 +760,8 @@ def verify_qzss_nav(self, sat, npr, msg, mode):
bs.pack_into('u16', rand_, 8+mlen, npr.salt)

if self.pk is None:
self.pk = load_pubkey(npr.keyid)
pubk_path = self.pubk_bdir + f"/{npr.keyid:03d}.der"
self.pk = load_pubkey(pubk_path)

status = False
try:
Expand Down

0 comments on commit 491902c

Please sign in to comment.