diff --git a/src/cssrlib/ewss.py b/src/cssrlib/ewss.py index 6737e26..44bd917 100644 --- a/src/cssrlib/ewss.py +++ b/src/cssrlib/ewss.py @@ -1276,8 +1276,8 @@ def decode(self, msg, i): self.lat = -90.0+180.0/65535*lati self.lon = -180.0+360.0/131071*loni - self.LM = self.r_t[smai] - self.Lm = self.r_t[smii] + self.LM = self.r_t[smai] # semi-major axis [km] + self.Lm = self.r_t[smii] # semi-minor axis [km] self.az = -90.0+180.0/64*azi # 3.7 Main Subject for Specific Settings diff --git a/src/cssrlib/osnma.py b/src/cssrlib/osnma.py index 283e55f..2e5e58f 100644 --- a/src/cssrlib/osnma.py +++ b/src/cssrlib/osnma.py @@ -18,9 +18,10 @@ from cryptography.hazmat.primitives.ciphers import algorithms from cryptography.hazmat.primitives.asymmetric import ec, utils from cryptography.exceptions import InvalidSignature -from binascii import unhexlify +from binascii import unhexlify, hexlify from enum import IntEnum import xml.etree.ElementTree as et +from cssrlib.gnss import gpst2time, time2gst class uOSNMA(IntEnum): @@ -55,12 +56,13 @@ class taginfo(): navmsg = None iodnav = -1 - def __init__(self, gst_sf, prn, adkd, tag, cnt, navmsg=None): + def __init__(self, gst_sf, prn, adkd, cop, tag, cnt, navmsg=None): if navmsg is False: return None self.gst_sf = gst_sf self.prn = prn self.adkd = adkd + self.cop = cop self.tag = tag self.cnt = cnt if navmsg is not None: @@ -76,22 +78,44 @@ class osnma(): npk_len_t = [0, 264, 0, 536, 0] tag_len_t = [0, 0, 0, 0, 0, 20, 24, 28, 32, 40, 0, 0, 0, 0, 0, 0] hash_table = {0: hashes.SHA256, 2: hashes.SHA3_256} - self_t = {27: [1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0], - 28: [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 0, 0], + mode_t = {27: [1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0], + 28: [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, + 1, 0, 0, 1, 0, 0, 1, 1, 0, 0], 31: [1, 0, 0, 1, 0, 1, 0, 0, 1, 1], - 33: [1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0]} # 0:self,1:cross + 33: [1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 0], + 34: [1, 2, 1, 2, 1, 0, 1, 2, 0, 1, 0, 0], + 35: [1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2], + 36: [1, 2, 1, 2, 1, 1, 2, 0, 1, 0], + 37: [1, 0, 1, 0, 1, 1, 0, 0, 1, 0], + 38: [1, 2, 1, 2, 1, 1, 2, 2, 1, 2], + 39: [1, 2, 1, 2, 1, 2, 0, 1], + 40: [1, 0, 1, 1, 1, 0, 0, 0], + 41: [1, 2, 1, 2, 1, 2, 2, 1], + } # 0:cross-auth, 1:self-auth, 2:flex + adkd_t = {27: [0, 0, 0, 0, 12, 0, 0, 0, 0, 4, 12, 0], 28: [0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 4, 12, 0, 0], 31: [0, 0, 0, 12, 0, 0, 0, 0, 12, 4], - 33: [0, 0, 4, 0, 12, 0, 0, 0, 0, 12, 0, 12]} + 33: [0, 0, 4, 0, 12, 0, 0, 0, 0, 12, 0, 12], + 34: [0, 0, 4, 0, 12, 0, 0, 0, 0, 12, 0, 12], + 35: [0, 0, 4, 0, 12, 0, 0, 0, 0, 12, 0, 0], + 36: [0, 0, 4, 0, 12, 0, 0, 0, 12, 12], + 37: [0, 0, 4, 0, 12, 0, 0, 0, 12, 0], + 38: [0, 0, 4, 0, 12, 0, 0, 0, 12, 0], + 39: [0, 0, 4, 0, 0, 0, 0, 12], + 40: [0, 0, 4, 12, 0, 0, 0, 12], + 41: [0, 0, 4, 0, 0, 0, 0, 12], + } # 0: NAV or unknown, 4: UTC, 12: NAV(SLOW-MAC) status = 0 cid0 = -1 - dsm = bytearray(512) - flg_dsm = 0 - nb = 0 + dsm = {} + flg_dsm = {} + nb = {} did0 = -1 + + # OSNMA parameters pkid = -1 cidkr = -1 hf = -1 @@ -99,18 +123,23 @@ class osnma(): ks = -1 ts = -1 maclt = -1 - wn = -1 - towh = -1 - tofst = -60 - alp = bytearray(6) - ds = bytearray(64) - pdk = bytearray(16) - kroot = bytearray(16) - key = bytearray(16) - key_p = bytearray(16) - key_c = bytearray(16) + wn = -1 # reference week number of root key + towh = -1 # reference hour of week of root key + tofst = -60 # time offset for time authentication + alp = bytearray(6) # seed + ds = bytearray(64) # digital signature + kroot = bytearray(16) # root key + key = bytearray(16) # key used for authentication + key_p = bytearray(16) # previous key + key_c = bytearray(16) # current key + gst_sf_c = bytearray(4) + gst_tow = -1 # current subframe time + gst_tow_p = -1 # previous subframe time + gst_sf_p = bytearray(4) # gst for previous sub-frame + + nt = 0 # number of tags in MACK hk = [] mack = [] tag = bytearray(42) @@ -124,12 +153,13 @@ class osnma(): vcnt = np.zeros(GALMAX+1, dtype=int) vstatus = np.zeros(GALMAX+1, dtype=bool) - # Public Key received from GSC OSNMA server + # Public Key in PEM received from GSC OSNMA server # note : EC_PARAMETER section should be removed. # pubk_path = '../data/OSNMA_PublicKey_20210920133026_s.pem' pubk_path = None # Merkle tree root (received from GSC OSNMA server) - mt_path = '../data/OSNMA_MerkleTree_20210920133026.xml' + # mt_path = '../data/pubkey/osnma/OSNMA_MerkleTree_20210920133026.xml' + mt_path = '../data/pubkey/osnma/OSNMA_MerkleTree_20240115100000_newPKID_1.xml' pk_list = [] def raw2der(self, ds): @@ -213,15 +243,16 @@ def process_hash(self, msg): def set_gst_sf(self, gst_wn, gst_tow): """ set Galileo sub-frame time """ - self.gst_sf = bs.pack('u12u20', gst_wn, gst_tow//30*30) - return True + gst_sf = bs.pack('u12u20', gst_wn, gst_tow//30*30) + return gst_sf def verify_root_key(self): """ verify root key """ + did = self.did0 if not self.status & uOSNMA.ROOTKEY_LOADED: # root key loaded return False lk_b = self.klen_t[self.ks]//8 - msg = bytearray([self.nma_header]) + self.dsm[1:13+lk_b] + msg = bytearray([self.nma_header]) + self.dsm[did][1:13+lk_b] result = False hash_func = self.hash_table[self.hf] @@ -242,10 +273,10 @@ def verify_root_key(self): self.status |= uOSNMA.ROOTKEY_VERIFIED # root key verified return result - def verify_pdk(self, p_dk): + def verify_pdk(self, p_dk, did): """ verify P_DK """ lk_b = self.klen_t[self.ks]//8 - msg = bytearray([self.nma_header]) + self.dsm[1:13+lk_b] + self.ds + msg = bytearray([self.nma_header]) + self.dsm[did][1:13+lk_b] + self.ds h = self.process_hash(msg) l_pdk = len(p_dk) return h[0:l_pdk] == p_dk @@ -285,10 +316,10 @@ def verify_key_chain(self, key, gst_wn, gst_tow0, ki=1): self.status |= uOSNMA.KEYCHAIN_VERIFIED # key-chain verified return result - def decode_dsm_kroot(self): + def decode_dsm_kroot(self, did): """ decode DSM-KROOT """ - v = bs.unpack_from('u4u4u2u2u2u2u4u4u8u4u12u8', self.dsm, 0) - self.nb = v[0]+6 # number of blocks + v = bs.unpack_from('u4u4u2u2u2u2u4u4u8u4u12u8', self.dsm[did], 0) + nb = v[0]+6 # number of blocks self.pkid = v[1] # Public Key ID self.cidkr = v[2] # KROOT Chain ID self.hf = v[4] # hash function 0:SHA-256,2:SHA3-256 @@ -299,8 +330,8 @@ def decode_dsm_kroot(self): self.maclt = v[8] # MAC lookup table self.wn = v[10] # KROOT week number, tow[h] self.towh = v[11] - self.alp = self.dsm[7:13] # random pattern alpha - l_dk = self.nb*104 + self.alp = self.dsm[did][7:13] # random pattern alpha + l_dk = nb*104 l_ds = 512 # P-256/SHA-256 l_k = self.klen_t[self.ks] l_pdk = l_dk-104-l_k-l_ds @@ -308,23 +339,24 @@ def decode_dsm_kroot(self): return False i = 13+l_k//8 - self.kroot = self.dsm[13:i] - self.ds = self.dsm[i:i+l_ds//8] + self.kroot = self.dsm[did][13:i] + self.ds = self.dsm[did][i:i+l_ds//8] i += l_ds//8 - p_dk = self.dsm[i:i+(l_pdk+7)//8] - if not self.verify_pdk(p_dk): + p_dk = self.dsm[did][i:i+(l_pdk+7)//8] + if not self.verify_pdk(p_dk, did): print("p_dk verification error.") return False self.status |= uOSNMA.ROOTKEY_LOADED # KROOT loaded + self.did0 = did return True - def decode_dsm_pkr(self): + def decode_dsm_pkr(self, did): """ decode DSM-PKR """ - nb, mid = bs.unpack_from('u4u4', self.dsm, 0) + nb, mid = bs.unpack_from('u4u4', self.dsm[did], 0) if nb < 7 or nb > 10: return False - itn = self.dsm[1:1+128] # 32*4 - npkt, npkid = bs.unpack_from('u4u4', self.dsm, 1024+8) + itn = self.dsm[did][1:1+128] # 32*4 + npkt, npkid = bs.unpack_from('u4u4', self.dsm[did], 1024+8) # new public key type 1:ECDSA P-256, 3: ECDSA P-521, 4: OAM if npkt > 4 or npkt == 0: return False @@ -338,9 +370,9 @@ def decode_dsm_pkr(self): l_pdp = l_dp - 1040 - l_npk if l_pdp < 0: return False - p_dp = self.dsm[i0:i0+l_pdp//8] + p_dp = self.dsm[did][i0:i0+l_pdp//8] - m0 = bytearray([self.dsm[129]])+npk # NPKT||NPKID||NPK + m0 = bytearray([self.dsm[did][129]])+npk # NPKT||NPKID||NPK if not self.verify_pdp(m0, p_dp): # verify P_DP return False @@ -371,37 +403,36 @@ def decode_hk(self, hk): return False if cpks != 1: # nominal only return False + + if did not in self.flg_dsm.keys(): + self.flg_dsm[did] = 0 + self.dsm[did] = bytearray(250) + self.nb[did] = 0 + if self.cid0 < 0: self.cid0 = cid - self.flg_dsm = 0 - self.nb = -1 + if cid != self.cid0: self.cid0 = cid - self.did0 = -1 - self.flg_dsm = 0 - - if self.did0 < 0: - self.did0 = did - if did != self.did0: - self.did0 = did - self.flg_dsm = 0 + self.flg_dsm[did] = 0 - self.dsm[bid*13:bid*13+13] = hk[2:] - self.flg_dsm |= 1 << bid + self.dsm[did][bid*13:bid*13+13] = hk[2:] + self.flg_dsm[did] |= 1 << bid if bid == 0: nb_ = (hk[2] >> 4) & 0xf - self.nb = nb_ + 6 # number of blocks + self.nb[did] = nb_ + 6 # number of blocks result = False - if self.nb > 0 and self.flg_dsm == (1 << self.nb)-1: + if did in self.nb.keys() and \ + self.flg_dsm[did] == (1 << self.nb[did])-1: if did <= 11: # DSM-KROOT - result = self.decode_dsm_kroot() + result = self.decode_dsm_kroot(did) else: # DSM-PKR - result = self.decode_dsm_pkr() + result = self.decode_dsm_pkr(did) if result: - self.flg_dsm = 0 + self.flg_dsm[did] = 0 return result def decode_tags_info(self, k): @@ -416,8 +447,9 @@ def decode_tags_info(self, k): if k == 0: tag0 = tag_k[0:lt_b] macseq = tag_k[lt_b:lt_b+2] # MACSEQ(12)+res(4) + cop = macseq[1] & 0xf macseq[1] &= 0xf0 - return tag0, macseq + return tag0, macseq, cop else: tag = tag_k[0:lt_b] tag_info = tag_k[lt_b:lt_b+2] @@ -428,23 +460,26 @@ def decode_tags_info(self, k): # 4:I/NAV timing # 12: slow MAC (5min) adkd = (tag_info[1] >> 4) & 0xf - return tag, prn_d, adkd + cop = tag_info[1] & 0xf # Data cut-off point + return tag, prn_d, adkd, cop return False def verify_maclt(self): - """ verify Tag sequence """ - if self.maclt not in self.self_t: + """ check consistency of Tag sequence """ + if self.maclt not in self.mode_t: return False - self_t = self.self_t[self.maclt] + mode_t = self.mode_t[self.maclt] adkd_t = self.adkd_t[self.maclt] if self.nt*2 != len(adkd_t): return False - gst_wn, gst_tow = bs.unpack_from('u12u20', self.gst_sf, 0) - ofst = 0 if gst_tow % 60 == 0 else 6 + gst_wn, gst_tow_p = bs.unpack_from('u12u20', self.gst_sf_p, 0) + ofst = 0 if gst_tow_p % 60 == 0 else 6 for k in range(self.nt-1): - tag, prn_d, adkd = self.decode_tags_info(k+1) + tag, prn_d, adkd, cop = self.decode_tags_info(k+1) i = k + ofst + 1 - if adkd != adkd_t[i] or (self_t[i] == 1 and prn_d != self.prn_a): + if mode_t[i] == 1 and prn_d != self.prn_a: # self-auth + return False + if mode_t[i] != 2 and adkd != adkd_t[i]: # non-flex return False return True @@ -461,11 +496,25 @@ def process_mac(self, msg): def verify_macseq(self): """ verify MACSEQ """ - msg = bytearray([self.prn_a])+self.gst_sf + msg = bytearray([self.prn_a])+self.gst_sf_p # Eq.22 + + if self.maclt not in self.mode_t: + return False + mode_t = self.mode_t[self.maclt] + gst_wn, gst_tow_p = bs.unpack_from('u12u20', self.gst_sf_p, 0) + ofst = 0 if gst_tow_p % 60 == 0 else 6 + + ltag_b = self.tag_len_t[self.ts]//8+2 + for k in range(self.nt): # add FLEX taginfo + if mode_t[k+ofst] != 2: + continue + i0 = k*ltag_b + msg += self.tag[i0+ltag_b-2:i0+ltag_b] + macseq_c_ = self.process_mac(msg) - tag0, macseq_ = self.decode_tags_info(0) - macseq_c = bs.unpack_from('u12', macseq_c_, 0) - macseq = bs.unpack_from('u12', macseq_, 0) + tag0, macseq_, cop = self.decode_tags_info(0) + macseq_c = bs.unpack_from('u12', macseq_c_, 0)[0] + macseq = bs.unpack_from('u12', macseq_, 0)[0] return macseq == macseq_c def save_mack(self, mack, prn): @@ -510,25 +559,54 @@ def decode_nav(self, df): j += 8 return True + def load_inav(self, msg): + """ load I/NAV navigation message """ + + # even page (120bit) + odd page (120bit) + # even page even/odd(1b)+page type(1b)+data1(112b)+tail(6) + # odd page even/odd(1b)+page type(1b)+data2(16b)+osnma(40b) + # sar(22b)+spare(2b)+crc(24b)+ssp(8b)+tail(6b) + nav = bytearray(16) + nma_b = bytearray(5) + + even, pt1 = bs.unpack_from('u1u1', msg, 0) + odd, pt2 = bs.unpack_from('u1u1', msg, 120) + + if even != 0 or odd != 1 or pt1 != 0 or pt2 != 0: + print("I/NAV page format error.") + + i = 2 + for k in range(14): + nav[k] = bs.unpack_from('u8', msg, i)[0] + i += 8 + i += 8 + for k in range(2): + nav[14+k] = bs.unpack_from('u8', msg, i)[0] + i += 8 + + for k in range(5): + nma_b[k] = bs.unpack_from('u8', msg, i)[0] + i += 8 + + return nav, nma_b + def load_nav(self, nav, prn): """ load navigation message into subframe buffer """ mt = bs.unpack_from('u6', nav, 0)[0] if mt > 0 and mt <= 10: - j = (mt-1)*16*8+160*8*(prn-1) - for k in range(16): - bs.pack_into('u8', self.subfrm, j, nav[k]) - j += 8 + j = (prn-1)*160+(mt-1)*16 + self.subfrm[j:j+16] = nav return True def gen_navmsg(self, prn): """ generate nav message for nma """ if prn < 1 or prn > self.GALMAX: - return False + return None j0 = 160*8*(prn-1) for k in range(5): mt = bs.unpack_from('u6', self.subfrm, j0+k*16*8)[0] if mt != k+1: - return False + return None iodnav1 = bs.unpack_from('u10', self.subfrm, j0+6+0*16*8)[0] for k in range(1, 4): @@ -537,13 +615,13 @@ def gen_navmsg(self, prn): if self.monlevel > 0: print("error: iodnav mismatch mt=%d %d %d" % (k+1, iodnav1, iodnav_)) - return False + return None if self.monlevel > 0: svid = bs.unpack_from('u6', self.subfrm, j0+16+3*16*8)[0] wn, tow = bs.unpack_from('u12u20', self.subfrm, j0+73+4*16*8) - print(" svid=%2d iodnav=%2d wn=%4d tow=%6d" % - (svid, iodnav1, wn, tow)) + print(f" svid={svid:2d} iodnav={ + iodnav1:2d} gst_wn={wn:4d} gst_tow={tow:6d}") msg = bytearray(69) # 549b MT1 120b, MT2 120b, MT3 122b, MT4 120b, MT5 67b @@ -575,20 +653,20 @@ def gen_utcmsg(self): t1 = bs.unpack_from('u6', mt6, 0)[0] # MT6 t2 = bs.unpack_from('u6', mt10, 0)[0] # MT10 if t1 != 6 or t2 != 10: - return False + return None if self.monlevel > 0: tow = bs.unpack_from('u20', mt6, 105)[0] - bs.pack_into('u20', mt6, 105, tow+self.tofst) # <- tow-=60 - print(" utc tow=%6d" % (tow)) + # bs.pack_into('u20', mt6, 105, tow+self.tofst) # <- tow-=60 + print(" utc gst_tow=%6d" % (tow)) - # 161b MT6 119b, MT10 42b - msg = bytearray(21) + # 141b MT6 99b, MT10 42b + msg = bytearray(18) j = 0 - for k in range(15): + for k in range(13): b = bs.unpack_from('u8', mt6, 6+k*8)[0] bs.pack_into('u8', msg, j, b) j += 8 - j = 119 + j = 99 for k in range(5): b = bs.unpack_from('u8', mt10, k*8+86)[0] bs.pack_into('u8', msg, j, b) @@ -600,23 +678,9 @@ def gen_utcmsg(self): return msg - def verify_navmsg(self, tag_): - """ verify nav/utc message """ - prn_d = tag_.prn - adkd = tag_.adkd - - if prn_d == -1: - return False - - if adkd == 0 or adkd == 12: - msg = tag_.navmsg - if not msg: - return False - if adkd == 4: - msg = self.gen_utcmsg() - if not msg: - return False - mlen = 161 if adkd == 4 else 549 + def gen_msg(self, adkd, prn_d, gst_sf, ctr, msg): + """ generate message for verification of NMA """ + mlen = 141 if adkd == 4 else 549 j = 0 if adkd == 0 and self.prn_a == prn_d: mlen += 8+42 @@ -632,21 +696,32 @@ def verify_navmsg(self, tag_): prn_d = self.prn_a bs.pack_into('u8u8', m, j, prn_d, self.prn_a) j += 16 - bs.pack_into('r32u8u2', m, j, tag_.gst_sf, - tag_.cnt, self.nma_header >> 6) + bs.pack_into('r32u8u2', m, j, gst_sf, ctr, self.nma_header >> 6) j += 42 for k in range(len(msg)): b = msg[k] if k == len(msg)-1: if adkd == 4: - bs.pack_into('u1', m, j, b >> 7) - j += 1 + bs.pack_into('u3', m, j, b >> 5) + j += 3 else: bs.pack_into('u5', m, j, b >> 3) j += 5 else: bs.pack_into('u8', m, j, b) j += 8 + return m + + def verify_navmsg(self, tag_): + """ verify nav/utc message """ + prn_d = tag_.prn + adkd = tag_.adkd + msg = tag_.navmsg + + if prn_d == -1 or not msg: + return False + + m = self.gen_msg(adkd, prn_d, tag_.gst_sf, tag_.cnt, msg) tag_c = self.process_mac(m) lt_b = self.tag_len_t[self.ts]//8 @@ -677,16 +752,31 @@ def chk_nav(self, result, tag_): return self.vstatus[i] def decode(self, nma_b, wn, tow, prn): - """ decode OSNMA message """ + """ decode OSNMA message + Parameters + ---------- + nma_b: bytearray(5) + NMA binary message (40bits) in I/NAV + wn: int + GPS-Week number + tow: int + GPS time-of-week + prn: int + PRN number + """ status = False k = (tow % 30)//2 if k == 0: # reset counter self.cnt[prn-1] = 0 self.hk[prn-1] = bytearray(15) self.mack[prn-1][0] = 0 + elif k < 0: + return - self.gst = tow - self.gst_tow = (tow//30)*30 + # convert GPS time to Galileo standard time (GST) + gst_wn, gst_tow = time2gst(gpst2time(wn, tow)) + + self.gst_tow = (gst_tow//30)*30 # current subframe-time # store sub-frame for NMA self.hk[prn-1][k] = nma_b[0] # HK-ROOT message self.mack[prn-1][k*4:k*4+4] = nma_b[1:5] # MACK message @@ -713,17 +803,22 @@ def decode(self, nma_b, wn, tow, prn): print("root-key not verified %s" % (s)) if self.decode_mack(prn): - self.set_gst_sf(wn, self.gst_tow) + self.gst_sf = self.set_gst_sf(gst_wn, self.gst_tow) # key-chain verification # skip if key-chain is already verified at t=gst_sf - if self.gst_sf != self.gst_sf_c: - self.status ^= uOSNMA.KEYCHAIN_VERIFIED + # if self.gst_sf != self.gst_sf_c and \ + # self.status & uOSNMA.KEYCHAIN_VERIFIED: + # self.status ^= uOSNMA.KEYCHAIN_VERIFIED if self.status & uOSNMA.ROOTKEY_VERIFIED and \ self.gst_sf != self.gst_sf_c: # root-key verified - ki = ((wn-self.wn)*86400*7 + - (self.gst_tow-self.towh*3600))//30+1 + + if self.status & uOSNMA.KEYCHAIN_VERIFIED: + ki = 1 + else: + ki = ((gst_wn-self.wn)*86400*7 + + (self.gst_tow-self.towh*3600))//30+1 result = self.verify_key_chain( - self.key_c, wn, self.gst_tow, ki) + self.key_c, gst_wn, self.gst_tow, ki) if result: self.gst_sf_c = self.gst_sf self.key = self.key_c @@ -731,26 +826,26 @@ def decode(self, nma_b, wn, tow, prn): s = "wn=%4d tow=%6d ki=%d prn=%d gst_tow=%d" % \ (wn, tow, ki, prn, self.gst_tow) if result: - print("key chain verified %s" % (s)) + print("Key chain verified %s" % (s)) else: - print("key chain not verified %s" % (s)) + print("Key chain not verified %s" % (s)) if self.status & uOSNMA.KEYCHAIN_VERIFIED: - # A1.6.2 Tag Sequence Verification - tow0 = self.gst_tow-30 - self.set_gst_sf(wn, tow0) + # 6.5 MAC Look-up Table Verification + self.gst_tow_p = self.gst_tow-30 + self.gst_sf_p = self.set_gst_sf(gst_wn, self.gst_tow_p) result = self.verify_maclt() if self.monlevel > 0: s = "on %4d/%6d gst_tow=%d prn=%d" % \ - (wn, tow, self.gst_tow-30, prn) + (wn, tow, self.gst_tow_p, prn) if result: - print("Tag Sequence verified %s" % (s)) + print("MAC Look-up Table verified %s" % (s)) else: - print("Tag Sequence not verified %s" % (s)) + print("MAC Look-up Table not verified %s" % (s)) if not result: return False - # A1.6.4 MACSEQ verification + # 6.6 MACSEQ Verification # prn_a|gst_sf result = self.verify_macseq() if result and self.monlevel > 0: @@ -761,47 +856,49 @@ def decode(self, nma_b, wn, tow, prn): (wn, tow, prn)) return False - # A1.6.5 Tags verification + # 6.7 Tag Verification for k in range(self.nt): - cnt = k+1 + ctr = k+1 if k == 0: # self-tag prn_d = self.prn_a adkd = 0 - tag, macseq = self.decode_tags_info(0) + tag, macseq, cop = self.decode_tags_info(0) else: - tag, prn_d, adkd = self.decode_tags_info(k) + tag, prn_d, adkd, cop = self.decode_tags_info(k) if adkd == 12: # delayed tag loading navmsg = self.gen_navmsg(prn_d) - tag_ = taginfo( - self.gst_sf, prn_d, adkd, tag, cnt, navmsg) + tag_ = taginfo(self.gst_sf, prn_d, adkd, cop, + tag, ctr, navmsg) self.tag_list.append(tag_) if adkd == 12: continue - if adkd == 0: + elif adkd == 0: navmsg = self.gen_navmsg(prn_d) + elif adkd == 4: + navmsg = self.gen_utcmsg() else: navmsg = None - tag_ = taginfo(self.gst_sf, prn_d, - adkd, tag, cnt, navmsg) + tag_ = taginfo(self.gst_sf_p, prn_d, + adkd, cop, tag, ctr, navmsg) result = self.verify_navmsg(tag_) status = self.chk_nav(result, tag_) if status and adkd == 4: self.status |= uOSNMA.UTC_VERIFIED if self.monlevel > 0: if result: - print("# %d prn_d=%2d adkd=%2d tag verified" - % (cnt, prn_d, adkd)) + print(f"# {ctr} prn_d={prn_d} adkd={ + adkd} tag verified") else: - print("%d prn_d=%2d adkd=%2d tag not verified" - % (cnt, prn_d, adkd)) + print(f"{ctr} prn_d={prn_d} adkd={ + adkd} tag not verified") # slow MAC for tag_ in self.tag_list: dt = self.difftime(self.gst_sf, tag_.gst_sf) if dt == 300: result = self.verify_navmsg(tag_) if self.monlevel > 0 and result: - print("# %d prn_d=%2d adkd=%2d tag verified" + print("SLOW-MAC# %d prn_d=%2d adkd=%2d tag verified" % (tag_.cnt, tag_.prn, tag_.adkd)) elif dt > 300: tag_ = None diff --git a/src/cssrlib/qznma.py b/src/cssrlib/qznma.py index 4716006..bb15deb 100644 --- a/src/cssrlib/qznma.py +++ b/src/cssrlib/qznma.py @@ -5,8 +5,8 @@ Satellite Positioning, Navigation and Timing Service (IS-QZSS-PNT-006), July, 2024 -Note: - to use the package for QZSNMA, the user needs to +Note: + to use the package for QZSNMA, the user needs to install the public keys provided by QSS. @author: Rui Hirokawa @@ -22,9 +22,8 @@ from enum import IntEnum import copy -pk = None -rds = bytearray(89) -flag_e = 0 +dtype_ = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'), + ('type', 'int'), ('len', 'int'), ('nav', 'S512')] class uNavId(IntEnum): @@ -99,273 +98,14 @@ def copy_buff(src, dst, ofst_s=0, ofst_d=0, blen=0): bs.pack_into(fmt, dst, b*32+ofst_d, d) -def load_navmsg_lnav(navfile, navmsg): - """ load GPS/QZSS LNAV navigation messages """ - dtype = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'), - ('type', 'int'), ('len', 'int'), ('nav', 'S512')] - v = np.genfromtxt(navfile, dtype=dtype) - - prn_ = np.unique(v['prn']) - - for prn in prn_: - sat = prn2sat(uGNSS.GPS, prn) - if sat not in navmsg.keys(): - navmsg[sat] = {} - - buff = bytearray(120) - vi = v[v['prn'] == prn] - for msg_ in vi['nav']: - msg = unhexlify(msg_) - - sid = bs.unpack_from('u3', msg, 53)[0] - - if sid > 3: - continue - - buff[(sid-1)*40:(sid-1)*40+40] = msg[0:40] - # buff = bytes(buff) - - id1 = bs.unpack_from('u3', buff, 53)[0] - id2 = bs.unpack_from('u3', buff, 320+53)[0] - id3 = bs.unpack_from('u3', buff, 320*2+53)[0] - - if id1 != 1 or id2 != 2 or id3 != 3: - continue - - # SF1 - iodc_ = bs.unpack_from('u2', buff, 32*2+2+22)[0] - iodc = bs.unpack_from('u8', buff, 32*7+2)[0] - iodc |= (iodc_ << 8) - - # SF2 - iode1 = bs.unpack_from('u8', buff, 320+32*2+2)[0] - # SF3 - iode2 = bs.unpack_from('u8', buff, 320*2+32*9+2)[0] - - if iode1 != iode2 or iode1 != (iodc & 0xff): - continue - - if iode1 not in navmsg[sat].keys(): - navmsg[sat][iode1] = {} - - if uNavId.GPS_LNAV in navmsg[sat][iode1].keys(): - continue - - tow = bs.unpack_from('u17', buff, 320*0+32+2)[0] - toc = bs.unpack_from('u16', buff, 320*0+32*7+8+2)[0] - toe = bs.unpack_from('u16', buff, 320*0+32*9+2)[0] - - navmsg[sat][iode1][uNavId.GPS_LNAV] = NavMsg( - uGNSS.GPS, uNavId.GPS_LNAV) - navmsg[sat][iode1][uNavId.GPS_LNAV].tow = tow - navmsg[sat][iode1][uNavId.GPS_LNAV].toe = toe - navmsg[sat][iode1][uNavId.GPS_LNAV].toc = toc - navmsg[sat][iode1][uNavId.GPS_LNAV].iodn = iode1 - navmsg[sat][iode1][uNavId.GPS_LNAV].iodc = iodc - navmsg[sat][iode1][uNavId.GPS_LNAV].msg = copy.copy(buff) - - -def load_navmsg_cnav(navfile, navmsg): - """ load GPS/QZSS CNAV navigation messages """ - dtype = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'), - ('type', 'int'), ('len', 'int'), ('nav', 'S512')] - v = np.genfromtxt(navfile, dtype=dtype) - - prn_ = np.unique(v['prn']) - - mt_t = {10: 1, 11: 2, 30: 3, 31: 3, 32: 3, 33: 3, - 35: 3, 36: 3, 37: 3, 61: 3} - - for prn in prn_: - sat = prn2sat(uGNSS.GPS, prn) - if sat not in navmsg.keys(): - navmsg[sat] = {} - - buff = bytearray(114) - vi = v[v['prn'] == prn] - for msg_ in vi['nav']: - msg = unhexlify(msg_) - - prn_, mt = bs.unpack_from('u6u6', msg, 8) - if mt not in mt_t.keys(): - continue - - sid = mt_t[mt] - - if sid > 3: - continue - - buff[(sid-1)*38:(sid-1)*38+38] = msg[0:38] - - id1 = bs.unpack_from('u6', buff, 14)[0] - id2 = bs.unpack_from('u6', buff, 304+14)[0] - id3 = bs.unpack_from('u6', buff, 304*2+14)[0] - - if id1 != 10 or id2 != 11 or id3 not in mt_t.keys(): - continue - - tow1 = bs.unpack_from('u17', buff, 20)[0] - toe1 = bs.unpack_from('u11', buff, 70)[0] - - # type 11 - toe2 = bs.unpack_from('u11', buff, 304+38)[0] - tow2 = bs.unpack_from('u17', buff, 304+20)[0] - - # MT 3x or 61 - toc = bs.unpack_from('u11', buff, 304*2+60)[0] - tow3 = bs.unpack_from('u17', buff, 304*2+20)[0] - - if toe1 != toe2 or toe1 != toc: - continue - - if toe1 not in navmsg[sat].keys(): - navmsg[sat][toe1] = {} - - if uNavId.GPS_CNAV in navmsg[sat][toe1].keys(): - continue - - navmsg[sat][toe1][uNavId.GPS_CNAV] = NavMsg( - uGNSS.GPS, uNavId.GPS_CNAV) - navmsg[sat][toe1][uNavId.GPS_CNAV].iodn = toe1 - navmsg[sat][toe1][uNavId.GPS_CNAV].iodc = toc - - navmsg[sat][toe1][uNavId.GPS_CNAV].tow = [tow1, tow2, tow3] - navmsg[sat][toe1][uNavId.GPS_CNAV].toe = toe1 - navmsg[sat][toe1][uNavId.GPS_CNAV].toc = toc - - navmsg[sat][toe1][uNavId.GPS_CNAV].msg = copy.copy(buff) - - -def load_navmsg_fnav(navfile, navmsg): - """ load Galileo F/NAV navigation messages """ - dtype = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'), - ('type', 'int'), ('len', 'int'), ('nav', 'S512')] - v = np.genfromtxt(navfile, dtype=dtype) - - prn_ = np.unique(v['prn']) - - for prn in prn_: - sat = prn2sat(uGNSS.GAL, prn) - if sat not in navmsg.keys(): - navmsg[sat] = {} - - buff = bytearray(124) - vi = v[v['prn'] == prn] - for msg_ in vi['nav']: - msg = unhexlify(msg_) - - sid = bs.unpack_from('u6', msg, 0)[0] - if sid > 4: - continue - - for k in range(31): # copy 244bits - buff[(sid-1)*31+k] = msg[k] - - sid1, svid1, iodnav1, toc = bs.unpack_from('u6u6u10u14', buff, 0) - sid2, iodnav2 = bs.unpack_from('u6u10', buff, 248*1) - sid3, iodnav3 = bs.unpack_from('u6u10', buff, 248*2) - sid4, iodnav4 = bs.unpack_from('u6u10', buff, 248*3) - - if sid1 != 1 or sid2 != 2 or sid3 != 3 or sid4 != 4: - continue - if iodnav1 != iodnav2 or iodnav1 != iodnav3 or iodnav1 != iodnav4: - continue - - if iodnav1 not in navmsg[sat].keys(): - navmsg[sat][iodnav1] = {} - - if uNavId.GAL_FNAV in navmsg[sat][iodnav1].keys(): - continue - - toe = bs.unpack_from('u14', buff, 248*2+160)[0] - - tow1 = bs.unpack_from('u20', buff, 248*0+167)[0] - tow2 = bs.unpack_from('u20', buff, 248*1+194)[0] - tow3 = bs.unpack_from('u20', buff, 248*2+186)[0] - tow4 = bs.unpack_from('u20', buff, 248*3+189)[0] - - navmsg[sat][iodnav1][uNavId.GAL_FNAV] = NavMsg( - uGNSS.GAL, uNavId.GAL_FNAV) - - navmsg[sat][iodnav1][uNavId.GAL_FNAV].tow = [ - tow1, tow2, tow3, tow4] - navmsg[sat][iodnav1][uNavId.GAL_FNAV].toe = toe - navmsg[sat][iodnav1][uNavId.GAL_FNAV].toc = toc - navmsg[sat][iodnav1][uNavId.GAL_FNAV].iodn = iodnav1 - navmsg[sat][iodnav1][uNavId.GAL_FNAV].iodc = 0 - navmsg[sat][iodnav1][uNavId.GAL_FNAV].msg = copy.copy(buff) - - -def load_navmsg_inav(navfile, navmsg): - """ load Galileo I/NAV navigation messages """ - dtype = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'), - ('type', 'int'), ('len', 'int'), ('nav', 'S512')] - v = np.genfromtxt(navfile, dtype=dtype) - - prn_ = np.unique(v['prn']) - - for prn in prn_: - sat = prn2sat(uGNSS.GAL, prn) - if sat not in navmsg.keys(): - navmsg[sat] = {} - - buff = bytearray(80) - vi = v[(v['prn'] == prn) & (v['type'] == 0)] # E1 only - tow_t = [0, 0, 0, 0, 0] - for j, msg_ in enumerate(vi['nav']): - tow_ = int(vi['tow'][j]) - - msg = unhexlify(msg_) - - sid, iodnav = bs.unpack_from('u6u10', msg, 2) - - if sid == 0 or sid > 5: - continue - - i = 2 - for k in range(14): - buff[(sid-1)*16+k] = bs.unpack_from('u8', msg, i)[0] - i += 8 - i += 8 - for k in range(2): - buff[(sid-1)*16+14+k] = bs.unpack_from('u8', msg, i)[0] - i += 8 +def load_pubkey(keyid, bdir='../data/qznma/pubkey'): + """ load public key information in der format """ + pubk_path = f"{bdir}/{keyid:03d}.der" + with open(pubk_path, 'rb') as f: + pubk = f.read() + pk = serialization.load_der_public_key(pubk) - sid1, iodnav1 = bs.unpack_from('u6u10', buff, 0) - sid2, iodnav2 = bs.unpack_from('u6u10', buff, 128*1) - sid3, iodnav3 = bs.unpack_from('u6u10', buff, 128*2) - sid4, iodnav4 = bs.unpack_from('u6u10', buff, 128*3) - sid5 = bs.unpack_from('u6', buff, 128*4)[0] - tow = bs.unpack_from('u20', buff, 128*4+85)[0] - toe = bs.unpack_from('u14', buff, 16)[0] - toc = bs.unpack_from('u14', buff, 128*3+54)[0] - - tow_t[sid-1] = tow_ - - if sid != 5 or sid1 != 1 or sid2 != 2 or sid3 != 3 or sid4 != 4 \ - or sid5 != 5: - continue - if iodnav1 != iodnav2 or iodnav1 != iodnav3 or iodnav1 != iodnav4: - continue - - if iodnav1 not in navmsg[sat].keys(): - navmsg[sat][iodnav1] = {} - - if uNavId.GAL_INAV in navmsg[sat][iodnav1].keys(): - continue - - if sat == 35: # debug - None - - navmsg[sat][iodnav1][uNavId.GAL_INAV] = NavMsg( - uGNSS.GAL, uNavId.GAL_INAV) - navmsg[sat][iodnav1][uNavId.GAL_INAV].iodn = iodnav1 - navmsg[sat][iodnav1][uNavId.GAL_INAV].iodc = 0 - navmsg[sat][iodnav1][uNavId.GAL_INAV].tow = tow - navmsg[sat][iodnav1][uNavId.GAL_INAV].toe = toe - navmsg[sat][iodnav1][uNavId.GAL_INAV].toc = toc - navmsg[sat][iodnav1][uNavId.GAL_INAV].msg = copy.copy(buff) - buff = bytearray(80) + return pk def raw2der(ds): @@ -379,341 +119,578 @@ def raw2der(ds): return bytes(der) -def lnav_to_mnav(msg, sys=uGNSS.QZS): - """ genarate MNAV(900b) from LNAV SF1,2,3 """ - mnav = bytearray(113) - - # RAND - if sys == uGNSS.QZS: - mask_t = [[0xfffffc, 0xffff80, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xfffffc], - [0xfffffc, 0xffff80, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xfffffc], - [0xfffffc, 0xffff80, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xfffffc]] - else: # GPS - mask_t = [[0xff0003, 0xfffffc, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xfffffc], - [0xff0003, 0xfffffc, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xffff80], - [0xff0003, 0xfffffc, 0xffffff, 0xffffff, - 0xffffff, 0xffffff, 0xffffff, 0xffffff, - 0xffffff, 0xfffffc]] - - for sid in range(3): - for k in range(10): - d = bs.unpack_from('u30', msg, sid*320+32*k+2)[0] & \ - (mask_t[sid][k] << 6) - bs.pack_into('u30', mnav, sid*300+30*k, d) - - return mnav - - -def cnav2_to_mnav(toi, msg, sys=uGNSS.QZS): - """ genarate MNAV(600b) from CNAV2 SF2 """ - mnav = bytearray(77) - - mask_t = [0xffffffff, 0x7fffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xfffffbff] - - bs.pack_into('u9', mnav, 0, toi) - - i = 52 - for k in range(18): - d = bs.unpack_from('u32', msg, i)[0] & mask_t[k] - i += 32 - bs.pack_into('u32', mnav, 9+32*k, d) - - return mnav - - -def cnav_to_mnav(msg, sys=uGNSS.QZS): - """ genarate MNAV(900b) from CNAV SF1,2,3 """ - mnav = bytearray(113) - - # RAND - if sys == uGNSS.QZS: - mask_t = [ - [0xff03ffff, 0xfbffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xfffef000], - [0xff03ffff, 0xfbffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xfffff000], - [0xff03ffff, 0xfbffffff, 0xffffffff, 0xfffffffe, - 0x00000000, 0x00000000, 0x00000000, 0x00000000, - 0x00000000]] - - else: # GPS - mask_t = [ - [0xfffc0fff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xfffff000], - [0xfffc0fff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xfffff000], - [0xfffc0fff, 0xffffffff, 0xffffffff, 0xfffffffe, - 0x00000000, 0x00000000, 0x00000000, 0x00000000, - 0x00000000]] - - for sid in range(3): - for k in range(9): - d = bs.unpack_from('u32', msg, sid*304+32*k)[0] & mask_t[sid][k] - bs.pack_into('u32', mnav, sid*300+32*k, d) - - return mnav - - -def fnav_to_mnav(msg): - """ genarate MNAV(976b) from F/NAV message 1,2,3,4 """ - mnav = bytearray(122) - - # RAND - mask_t = [0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, - 0xffffffff, 0xffffffff, 0xfffffc00, 0x0003f] - - for sid in range(4): - for k in range(8): - fmt = 'u32' if k < 7 else 'u20' - d = bs.unpack_from(fmt, msg, sid*248+32*k)[0] & mask_t[k] - bs.pack_into(fmt, mnav, sid*244+32*k, d) - - return mnav - - -def inav_to_mnav(msg): - """ genarate MNAV(640b) from I/NAV message 1,2,3,4,5 """ - return copy.copy(msg) - - -def svid2sat(svid): - if svid <= 63: - sat = prn2sat(uGNSS.GPS, svid) - elif svid <= 127: - sat = prn2sat(uGNSS.GAL, svid-64) - elif svid <= 191: - sat = prn2sat(uGNSS.SBS, svid) - elif svid <= 202: - sat = prn2sat(uGNSS.QZS, svid) - - return sat - - -def decode_gnss_rds(tow, msg, navmsg, i): - """ generate RDS for GNSS """ - global pk - # nid = 0: QZNMA - # rtow: - # GPS LNAV/CNAV: tow(17), CNAV2: toi(9),itow(8) - # svid: GPS:1-63, Galileo:65-127, SBAS:129-191, QZS:193-202 - # mt 1: GPS LNAV, 2: GPS CNAV, 3: GPS CNAV2 - # 4: Galileo F/NAV, 5: Galileo I/NAV - nid, rtow, svid, mt = bs.unpack_from('u4u20u8u4', msg, i) - i += 36 - # iode1 GPS LNAV: iode of SF2, GPS CNAV/CNAV2 toe of MT10/SF2 - # Galileo F/NAV,I/NAV IODnav - # iode2 GPS LNAV: iode of SF3, GPS CNAV toe of MT11, other: zero - # iodc GPS LNAV iodc, CNAV toc of MT30-37, other: zero - iode1, iode2, iodc, keyid = bs.unpack_from('u11u11u11u8', msg, i) - i += 41 - ds = bytearray(64) - - for k in range(16): - d = bs.unpack_from('u32', msg, i)[0] - i += 32 - bs.pack_into('u32', ds, k*32, d) - - salt = bs.unpack_from('u16', msg, i)[0] - i += 16 - - sat = svid2sat(svid) - sys, prn = sat2prn(sat) - - if svid == 0 or sat not in navmsg.keys() or \ - iode1 not in navmsg[sat].keys() or \ - mt not in navmsg[sat][iode1].keys(): - return None, None - - npr = NavParam(keyid, ds, salt, mt, nid, rtow, svid, iode1, iode2, iodc) - tow_ = navmsg[sat][iode1][mt].tow - - print(f"tow={tow} rtow={rtow} tow_i={tow_} svid={svid:3d} mt={mt} " + - f"iode={iode1:4d} iode2={iode2:4d} iodc={iodc:4d} key={keyid}") - - if mt == uNavId.GPS_LNAV: - - mnav = lnav_to_mnav(navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) - - bs.pack_into('u17', mnav, 300*0+30, rtow) - bs.pack_into('u17', mnav, 300*1+30, rtow+1) - bs.pack_into('u17', mnav, 300*2+30, rtow+2) - - elif mt == uNavId.GPS_CNAV: - - mnav = cnav_to_mnav(navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) - - bs.pack_into('u17', mnav, 300*0+20, rtow) - bs.pack_into('u17', mnav, 300*1+20, rtow+1) - bs.pack_into('u17', mnav, 300*2+20, rtow+2) - - elif mt == uNavId.GPS_CNAV2: - - mnav = cnav2_to_mnav(navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) +class qznma(): + """ class for QZNMA processing """ - elif mt == uNavId.GAL_FNAV: + def __init__(self): + self.navmsg = {} + self.pk = None + self.rds = bytearray(89) + self.flag_e = 0 + self.mask = 0 + self.tow_p = -1 + self.buff = bytearray(120) - mnav = fnav_to_mnav(navmsg[sat][iode1][mt].msg) + def load_navmsg_lnav(self, navfile): + """ load GPS/QZSS LNAV navigation messages """ + v = np.genfromtxt(navfile, dtype=dtype_) + prn_ = np.unique(v['prn']) - bs.pack_into('u20', mnav, 244*0+167, rtow) - bs.pack_into('u20', mnav, 244*1+194, rtow+10) - bs.pack_into('u20', mnav, 244*2+186, rtow+20) - bs.pack_into('u20', mnav, 244*3+189, rtow+30) + for prn in prn_: + sat = prn2sat(uGNSS.GPS, prn) + if sat not in self.navmsg.keys(): + self.navmsg[sat] = {} - elif mt == uNavId.GAL_INAV: + buff = bytearray(120) + vi = v[v['prn'] == prn] + for msg_ in vi['nav']: + msg = unhexlify(msg_) - mnav = inav_to_mnav(navmsg[sat][iode1][mt].msg) - bs.pack_into('u20', mnav, 128*4+85, rtow) + sid = bs.unpack_from('u3', msg, 53)[0] + if sid > 3: + continue - return npr, mnav + buff[(sid-1)*40:(sid-1)*40+40] = msg[0:40] + # buff = bytes(buff) + id1 = bs.unpack_from('u3', buff, 53)[0] + id2 = bs.unpack_from('u3', buff, 320+53)[0] + id3 = bs.unpack_from('u3', buff, 320*2+53)[0] + + if id1 != 1 or id2 != 2 or id3 != 3: + continue -def verify_gnss_nav(npr, mnav): - """ verifify the navigation message in MNAV using DS """ - global pk + # SF1 + iodc_ = bs.unpack_from('u2', buff, 32*2+2+22)[0] + iodc = bs.unpack_from('u8', buff, 32*7+2)[0] + iodc |= (iodc_ << 8) + + # SF2 + iode1 = bs.unpack_from('u8', buff, 320+32*2+2)[0] + # SF3 + iode2 = bs.unpack_from('u8', buff, 320*2+32*9+2)[0] + + if iode1 != iode2 or iode1 != (iodc & 0xff): + continue + + if iode1 not in self.navmsg[sat].keys(): + self.navmsg[sat][iode1] = {} + + if uNavId.GPS_LNAV in self.navmsg[sat][iode1].keys(): + continue + + tow = bs.unpack_from('u17', buff, 320*0+32+2)[0] + toc = bs.unpack_from('u16', buff, 320*0+32*7+8+2)[0] + toe = bs.unpack_from('u16', buff, 320*0+32*9+2)[0] + + self.navmsg[sat][iode1][uNavId.GPS_LNAV] = NavMsg( + uGNSS.GPS, uNavId.GPS_LNAV) + self.navmsg[sat][iode1][uNavId.GPS_LNAV].tow = tow + self.navmsg[sat][iode1][uNavId.GPS_LNAV].toe = toe + self.navmsg[sat][iode1][uNavId.GPS_LNAV].toc = toc + self.navmsg[sat][iode1][uNavId.GPS_LNAV].iodn = iode1 + self.navmsg[sat][iode1][uNavId.GPS_LNAV].iodc = iodc + self.navmsg[sat][iode1][uNavId.GPS_LNAV].msg = copy.copy(buff) + + def load_navmsg_cnav(self, navfile): + """ load GPS/QZSS CNAV navigation messages """ + v = np.genfromtxt(navfile, dtype=dtype_) + prn_ = np.unique(v['prn']) + + mt_t = {10: 1, 11: 2, 30: 3, 31: 3, 32: 3, 33: 3, + 35: 3, 36: 3, 37: 3, 61: 3} - if npr.mt == uNavId.GPS_LNAV: - blen, mlen = 125, 900 - elif npr.mt == uNavId.GPS_CNAV: - blen, mlen = 125, 900 - elif npr.mt == uNavId.GPS_CNAV2: - blen, mlen = 89, 609 - elif npr.mt == uNavId.GAL_FNAV: - blen, mlen = 134, 976 - elif npr.mt == uNavId.GAL_INAV: - blen, mlen = 92, 640 - - rand_ = bytearray(blen) - - bs.pack_into('u4u20u8u4', rand_, 0, npr.nid, npr.rtow, npr.svid, npr.mt) - bs.pack_into('u11u11u11u8', rand_, 36, npr.iode1, - npr.iode2, npr.iodc, npr.keyid) - copy_buff(mnav, rand_, 0, 77, mlen) - bs.pack_into('u16', rand_, 77+mlen, npr.salt) - - sat = svid2sat(npr.svid) - sys, prn = sat2prn(sat) - - if pk is None: - pk = load_pubkey(npr.keyid) - - ds_der = raw2der(npr.ds) - status = False - try: - pk.verify(ds_der, bytes(rand_), ec.ECDSA(hashes.SHA256())) - print(f'mt={npr.mt} sys={sys} prn={prn} signature OK.') - status = True - except InvalidSignature: - print(f'mt={npr.mt} sys={sys} prn={prn} signature NG.') - - return status - - -def gen_rds(mode, msg, mask): - """ prepare RDS from navigation message and parameters """ - global rds - - if mode == uNavId.GPS_LNAV: - sid, d = bs.unpack_from('u2u14', msg, 32*2+2+8) - if sid > 0: - i0 = (sid-1)*180 - bs.pack_into('u14', rds, i0, d) - - for k in range(7): - d = bs.unpack_from('u24', msg, 32*(k+3)+2)[0] - bs.pack_into('u24', rds, i0+14+24*k, d) - - mask |= (1 << (sid-1)) - - elif mode == uNavId.GPS_CNAV: - sid = bs.unpack_from('u2', msg, 38)[0] - if sid > 0: - copy_buff(msg, rds, 40, (sid-1)*236, 236) - mask |= (1 << (sid-1)) - - elif mode == uNavId.GPS_CNAV2: - sid = bs.unpack_from('u2', msg, 1266)[0] - if sid > 0: - copy_buff(msg, rds, 1268, (sid-1)*234, 234) - mask |= (1 << (sid-1)) - - keyid = rds[0] - ds = rds[1:65] - salt = (rds[65] << 8) | rds[66] - - npr = NavParam(keyid, ds, salt) - - return npr, mask - - -def msg2nav(i0, msg, buff, mode): - """ prepare navigation message (LNAV/CNAV) from raw nav message """ - global flag_e - - if mode == uNavId.GPS_LNAV: - blen = 40 - else: - blen = 38 - - buff[(i0-1)*blen:(i0-1)*blen+blen] = msg[0:blen] - flag_e |= (1 << (i0-1)) - return buff + for prn in prn_: + sat = prn2sat(uGNSS.GPS, prn) + if sat not in self.navmsg.keys(): + self.navmsg[sat] = {} + + buff = bytearray(114) + vi = v[v['prn'] == prn] + for msg_ in vi['nav']: + msg = unhexlify(msg_) + + prn_, mt = bs.unpack_from('u6u6', msg, 8) + if mt not in mt_t.keys(): + continue + + sid = mt_t[mt] + + if sid > 3: + continue + + buff[(sid-1)*38:(sid-1)*38+38] = msg[0:38] + + id1 = bs.unpack_from('u6', buff, 14)[0] + id2 = bs.unpack_from('u6', buff, 304+14)[0] + id3 = bs.unpack_from('u6', buff, 304*2+14)[0] + + if id1 != 10 or id2 != 11 or id3 not in mt_t.keys(): + continue + + tow1 = bs.unpack_from('u17', buff, 20)[0] + toe1 = bs.unpack_from('u11', buff, 70)[0] + + # type 11 + toe2 = bs.unpack_from('u11', buff, 304+38)[0] + tow2 = bs.unpack_from('u17', buff, 304+20)[0] + + # MT 3x or 61 + toc = bs.unpack_from('u11', buff, 304*2+60)[0] + tow3 = bs.unpack_from('u17', buff, 304*2+20)[0] + + if toe1 != toe2 or toe1 != toc: + continue + + if toe1 not in self.navmsg[sat].keys(): + self.navmsg[sat][toe1] = {} + + if uNavId.GPS_CNAV in self.navmsg[sat][toe1].keys(): + continue + + self.navmsg[sat][toe1][uNavId.GPS_CNAV] = NavMsg( + uGNSS.GPS, uNavId.GPS_CNAV) + self.navmsg[sat][toe1][uNavId.GPS_CNAV].iodn = toe1 + self.navmsg[sat][toe1][uNavId.GPS_CNAV].iodc = toc + self.navmsg[sat][toe1][uNavId.GPS_CNAV].tow = [ + tow1, tow2, tow3] + self.navmsg[sat][toe1][uNavId.GPS_CNAV].toe = toe1 + self.navmsg[sat][toe1][uNavId.GPS_CNAV].toc = toc + self.navmsg[sat][toe1][uNavId.GPS_CNAV].msg = copy.copy(buff) + + def load_navmsg_fnav(self, navfile): + """ load Galileo F/NAV navigation messages """ + v = np.genfromtxt(navfile, dtype=dtype_) + prn_ = np.unique(v['prn']) + + for prn in prn_: + sat = prn2sat(uGNSS.GAL, prn) + if sat not in self.navmsg.keys(): + self.navmsg[sat] = {} + + buff = bytearray(124) + vi = v[v['prn'] == prn] + for msg_ in vi['nav']: + msg = unhexlify(msg_) + + sid = bs.unpack_from('u6', msg, 0)[0] + if sid > 4: + continue + + for k in range(31): # copy 244bits + buff[(sid-1)*31+k] = msg[k] + + sid1, svid1, iodnav1, toc = bs.unpack_from( + 'u6u6u10u14', buff, 0) + sid2, iodnav2 = bs.unpack_from('u6u10', buff, 248*1) + sid3, iodnav3 = bs.unpack_from('u6u10', buff, 248*2) + sid4, iodnav4 = bs.unpack_from('u6u10', buff, 248*3) + + if sid1 != 1 or sid2 != 2 or sid3 != 3 or sid4 != 4: + continue + if iodnav1 != iodnav2 or iodnav1 != iodnav3 or \ + iodnav1 != iodnav4: + continue + + if iodnav1 not in self.navmsg[sat].keys(): + self.navmsg[sat][iodnav1] = {} + + if uNavId.GAL_FNAV in self.navmsg[sat][iodnav1].keys(): + continue + + toe = bs.unpack_from('u14', buff, 248*2+160)[0] + + tow1 = bs.unpack_from('u20', buff, 248*0+167)[0] + tow2 = bs.unpack_from('u20', buff, 248*1+194)[0] + tow3 = bs.unpack_from('u20', buff, 248*2+186)[0] + tow4 = bs.unpack_from('u20', buff, 248*3+189)[0] + + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV] = \ + NavMsg(uGNSS.GAL, uNavId.GAL_FNAV) + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].tow = \ + [tow1, tow2, tow3, tow4] + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].toe = toe + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].toc = toc + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].iodn = iodnav1 + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].iodc = 0 + self.navmsg[sat][iodnav1][uNavId.GAL_FNAV].msg = \ + copy.copy(buff) + + def load_navmsg_inav(self, navfile): + """ load Galileo I/NAV navigation messages """ + v = np.genfromtxt(navfile, dtype=dtype_) + prn_ = np.unique(v['prn']) + + for prn in prn_: + sat = prn2sat(uGNSS.GAL, prn) + if sat not in self.navmsg.keys(): + self.navmsg[sat] = {} + buff = bytearray(80) + vi = v[(v['prn'] == prn) & (v['type'] == 0)] # E1 only + tow_t = [0, 0, 0, 0, 0] + for j, msg_ in enumerate(vi['nav']): + tow_ = int(vi['tow'][j]) -def load_pubkey(keyid, bdir='../data/pubkey'): - """ load public key information in der format """ - pubk_path = f"{bdir}/{keyid:03d}.der" - with open(pubk_path, 'rb') as f: - pubk = f.read() - pk = serialization.load_der_public_key(pubk) + msg = unhexlify(msg_) - return pk + sid, iodnav = bs.unpack_from('u6u10', msg, 2) + if sid == 0 or sid > 5: + continue -def verify_qzss_nav(npr, msg, mode): - """ verify the navigation messages for QZSS LNAV/CNAV/CNAV2 """ - global pk - - if mode == uNavId.GPS_LNAV: - blen, mlen = 116, 900 - elif mode == uNavId.GPS_CNAV: - blen, mlen = 137, 900 - elif mode == uNavId.GPS_CNAV2: - blen, mlen = 100, 609 - - k1 = (mlen+7)//8 - - rand_ = bytearray(blen) - rand_[0] = npr.keyid - rand_[1:k1+1] = msg - bs.pack_into('u16', rand_, 8+mlen, npr.salt) - - if pk is None: - pk = load_pubkey(npr.keyid) - - status = False - try: - pk.verify(raw2der(npr.ds), bytes(rand_), - ec.ECDSA(hashes.SHA256())) - status = True - print(f'mode={mode} keyid={npr.keyid} signature OK.') - except InvalidSignature: - print(f'mode={mode} keyid={npr.keyid} signature NG.') - return status + i = 2 + for k in range(14): + buff[(sid-1)*16+k] = bs.unpack_from('u8', msg, i)[0] + i += 8 + i += 8 + for k in range(2): + buff[(sid-1)*16+14+k] = bs.unpack_from('u8', msg, i)[0] + i += 8 + + sid1, iodnav1 = bs.unpack_from('u6u10', buff, 0) + sid2, iodnav2 = bs.unpack_from('u6u10', buff, 128*1) + sid3, iodnav3 = bs.unpack_from('u6u10', buff, 128*2) + sid4, iodnav4 = bs.unpack_from('u6u10', buff, 128*3) + sid5 = bs.unpack_from('u6', buff, 128*4)[0] + tow = bs.unpack_from('u20', buff, 128*4+85)[0] + toe = bs.unpack_from('u14', buff, 16)[0] + toc = bs.unpack_from('u14', buff, 128*3+54)[0] + + tow_t[sid-1] = tow_ + + if sid != 5 or sid1 != 1 or sid2 != 2 or sid3 != 3 or \ + sid4 != 4 or sid5 != 5: + continue + if iodnav1 != iodnav2 or iodnav1 != iodnav3 or \ + iodnav1 != iodnav4: + continue + + if iodnav1 not in self.navmsg[sat].keys(): + self.navmsg[sat][iodnav1] = {} + + if uNavId.GAL_INAV in self.navmsg[sat][iodnav1].keys(): + continue + + self.navmsg[sat][iodnav1][uNavId.GAL_INAV] = \ + NavMsg(uGNSS.GAL, uNavId.GAL_INAV) + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].iodn = iodnav1 + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].iodc = 0 + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].tow = tow + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].toe = toe + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].toc = toc + self.navmsg[sat][iodnav1][uNavId.GAL_INAV].msg = \ + copy.copy(buff) + + def lnav_to_mnav(self, msg, sys=uGNSS.QZS): + """ genarate MNAV(900b) from LNAV SF1,2,3 """ + mnav = bytearray(113) + + # RAND + if sys == uGNSS.QZS: + mask_t = [[0xfffffc, 0xffff80, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xfffffc], + [0xfffffc, 0xffff80, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xfffffc], + [0xfffffc, 0xffff80, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xfffffc]] + else: # GPS + mask_t = [[0xff0003, 0xfffffc, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xfffffc], + [0xff0003, 0xfffffc, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xffff80], + [0xff0003, 0xfffffc, 0xffffff, 0xffffff, + 0xffffff, 0xffffff, 0xffffff, 0xffffff, + 0xffffff, 0xfffffc]] + + for sid in range(3): + for k in range(10): + d = bs.unpack_from('u30', msg, sid*320+32*k+2)[0] & \ + (mask_t[sid][k] << 6) + bs.pack_into('u30', mnav, sid*300+30*k, d) + + return mnav + + def cnav2_to_mnav(self, toi, msg, sys=uGNSS.QZS): + """ genarate MNAV(600b) from CNAV2 SF2 """ + mnav = bytearray(77) + + mask_t = [0xffffffff, 0x7fffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xfffffbff] + + bs.pack_into('u9', mnav, 0, toi) + + i = 52 + for k in range(18): + d = bs.unpack_from('u32', msg, i)[0] & mask_t[k] + i += 32 + bs.pack_into('u32', mnav, 9+32*k, d) + + return mnav + + def cnav_to_mnav(self, msg, sys=uGNSS.QZS): + """ genarate MNAV(900b) from CNAV SF1,2,3 """ + mnav = bytearray(113) + + # RAND + if sys == uGNSS.QZS: + mask_t = [ + [0xff03ffff, 0xfbffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xfffef000], + [0xff03ffff, 0xfbffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xfffff000], + [0xff03ffff, 0xfbffffff, 0xffffffff, 0xfffffffe, + 0x00000000, 0x00000000, 0x00000000, 0x00000000, + 0x00000000]] + + else: # GPS + mask_t = [ + [0xfffc0fff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xfffff000], + [0xfffc0fff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xfffff000], + [0xfffc0fff, 0xffffffff, 0xffffffff, 0xfffffffe, + 0x00000000, 0x00000000, 0x00000000, 0x00000000, + 0x00000000]] + + for sid in range(3): + for k in range(9): + d = bs.unpack_from('u32', msg, sid*304 + + 32*k)[0] & mask_t[sid][k] + bs.pack_into('u32', mnav, sid*300+32*k, d) + + return mnav + + def fnav_to_mnav(self, msg): + """ genarate MNAV(976b) from Galileo F/NAV message 1,2,3,4 """ + mnav = bytearray(122) + + # RAND + mask_t = [0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff, + 0xffffffff, 0xffffffff, 0xfffffc00, 0x0003f] + + for sid in range(4): + for k in range(8): + fmt = 'u32' if k < 7 else 'u20' + d = bs.unpack_from(fmt, msg, sid*248+32*k)[0] & mask_t[k] + bs.pack_into(fmt, mnav, sid*244+32*k, d) + + return mnav + + def inav_to_mnav(self, msg): + """ genarate MNAV(640b) from Galileo I/NAV message 1,2,3,4,5 """ + return copy.copy(msg) + + def svid2sat(self, svid): + """ convert svid in QZNMA to sat """ + if svid <= 63: + sat = prn2sat(uGNSS.GPS, svid) + elif svid <= 127: + sat = prn2sat(uGNSS.GAL, svid-64) + elif svid <= 191: + sat = prn2sat(uGNSS.SBS, svid) + elif svid <= 202: + sat = prn2sat(uGNSS.QZS, svid) + + return sat + + def decode_gnss_rds(self, tow, msg, i): + """ generate RDS for GNSS """ + # nid = 0: QZNMA + # rtow: + # GPS LNAV/CNAV: tow(17), CNAV2: toi(9),itow(8) + # svid: GPS:1-63, Galileo:65-127, SBAS:129-191, QZS:193-202 + # mt 1: GPS LNAV, 2: GPS CNAV, 3: GPS CNAV2 + # 4: Galileo F/NAV, 5: Galileo I/NAV + nid, rtow, svid, mt = bs.unpack_from('u4u20u8u4', msg, i) + i += 36 + # iode1 GPS LNAV: iode of SF2, GPS CNAV/CNAV2 toe of MT10/SF2 + # Galileo F/NAV,I/NAV IODnav + # iode2 GPS LNAV: iode of SF3, GPS CNAV toe of MT11, other: zero + # iodc GPS LNAV iodc, CNAV toc of MT30-37, other: zero + iode1, iode2, iodc, keyid = bs.unpack_from('u11u11u11u8', msg, i) + i += 41 + ds = bytearray(64) + + for k in range(16): + d = bs.unpack_from('u32', msg, i)[0] + i += 32 + bs.pack_into('u32', ds, k*32, d) + + salt = bs.unpack_from('u16', msg, i)[0] + i += 16 + + sat = self.svid2sat(svid) + sys, prn = sat2prn(sat) + + if svid == 0 or sat not in self.navmsg.keys() or \ + iode1 not in self.navmsg[sat].keys() or \ + mt not in self.navmsg[sat][iode1].keys(): + return None, None + + npr = NavParam(keyid, ds, salt, mt, nid, + rtow, svid, iode1, iode2, iodc) + tow_ = self.navmsg[sat][iode1][mt].tow + + print(f"tow={tow} rtow={rtow} tow_i={tow_} svid={svid:3d} mt={mt} " + + f"iode={iode1:4d} iode2={iode2:4d} iodc={iodc:4d} key={keyid}") + + if mt == uNavId.GPS_LNAV: + + mnav = self.lnav_to_mnav( + self.navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) + + bs.pack_into('u17', mnav, 300*0+30, rtow) + bs.pack_into('u17', mnav, 300*1+30, rtow+1) + bs.pack_into('u17', mnav, 300*2+30, rtow+2) + + elif mt == uNavId.GPS_CNAV: + + mnav = self.cnav_to_mnav( + self.navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) + + bs.pack_into('u17', mnav, 300*0+20, rtow) + bs.pack_into('u17', mnav, 300*1+20, rtow+1) + bs.pack_into('u17', mnav, 300*2+20, rtow+2) + + elif mt == uNavId.GPS_CNAV2: + + mnav = self.cnav2_to_mnav( + self.navmsg[sat][iode1][mt].msg, sys=uGNSS.GPS) + + elif mt == uNavId.GAL_FNAV: + + mnav = self.fnav_to_mnav(self.navmsg[sat][iode1][mt].msg) + + bs.pack_into('u20', mnav, 244*0+167, rtow) + bs.pack_into('u20', mnav, 244*1+194, rtow+10) + bs.pack_into('u20', mnav, 244*2+186, rtow+20) + bs.pack_into('u20', mnav, 244*3+189, rtow+30) + + elif mt == uNavId.GAL_INAV: + + mnav = self.inav_to_mnav(self.navmsg[sat][iode1][mt].msg) + bs.pack_into('u20', mnav, 128*4+85, rtow) + + return npr, mnav + + def verify_gnss_nav(self, npr, mnav): + """ verifify the navigation message in MNAV using DS """ + if npr.mt == uNavId.GPS_LNAV: + blen, mlen = 125, 900 + elif npr.mt == uNavId.GPS_CNAV: + blen, mlen = 125, 900 + elif npr.mt == uNavId.GPS_CNAV2: + blen, mlen = 89, 609 + elif npr.mt == uNavId.GAL_FNAV: + blen, mlen = 134, 976 + elif npr.mt == uNavId.GAL_INAV: + blen, mlen = 92, 640 + + rand_ = bytearray(blen) + + bs.pack_into('u4u20u8u4', rand_, 0, npr.nid, + npr.rtow, npr.svid, npr.mt) + bs.pack_into('u11u11u11u8', rand_, 36, npr.iode1, + npr.iode2, npr.iodc, npr.keyid) + copy_buff(mnav, rand_, 0, 77, mlen) + bs.pack_into('u16', rand_, 77+mlen, npr.salt) + + sat = self.svid2sat(npr.svid) + sys, prn = sat2prn(sat) + + if self.pk is None: + self.pk = load_pubkey(npr.keyid) + + ds_der = raw2der(npr.ds) + status = False + try: + self.pk.verify(ds_der, bytes(rand_), ec.ECDSA(hashes.SHA256())) + print(f'mt={npr.mt} sys={sys} prn={prn} signature OK.') + status = True + except InvalidSignature: + print(f'mt={npr.mt} sys={sys} prn={prn} signature NG.') + + return status + + def gen_rds(self, mode, msg): + """ prepare RDS from navigation message and parameters """ + + if mode == uNavId.GPS_LNAV: # 540bits (180bitsx3) + sid, d = bs.unpack_from('u2u14', msg, 32*2+2+8) + if sid > 0: + i0 = (sid-1)*180 + bs.pack_into('u14', self.rds, i0, d) + + for k in range(7): + d = bs.unpack_from('u24', msg, 32*(k+3)+2)[0] + bs.pack_into('u24', self.rds, i0+14+24*k, d) + + self.mask |= (1 << (sid-1)) + + elif mode == uNavId.GPS_CNAV: # 708bits (236bits*3) + sid = bs.unpack_from('u2', msg, 38)[0] + if sid > 0: + copy_buff(msg, self.rds, 40, (sid-1)*236, 236) + self.mask |= (1 << (sid-1)) + + elif mode == uNavId.GPS_CNAV2: # 702bits (234bits*3) + sid = bs.unpack_from('u2', msg, 1266)[0] + if sid > 0: + copy_buff(msg, self.rds, 1268, (sid-1)*234, 234) + self.mask |= (1 << (sid-1)) + + # key ID (8bits), DS (512bits), SALT (16bits) + keyid = self.rds[0] + ds = self.rds[1:65] + salt = (self.rds[65] << 8) | self.rds[66] + npr = NavParam(keyid, ds, salt) + + return npr + + def msg2nav(self, i0, msg, mode): + """ prepare navigation message (LNAV/CNAV) from raw nav message """ + blen = 40 if mode == uNavId.GPS_LNAV else 38 + self.buff[(i0-1)*blen:(i0-1)*blen+blen] = msg[0:blen] + self.flag_e |= (1 << (i0-1)) + + def verify_qzss_nav(self, npr, msg, mode): + """ verify the navigation messages for QZSS LNAV/CNAV/CNAV2 """ + if mode == uNavId.GPS_LNAV: + blen, mlen = 116, 900 + elif mode == uNavId.GPS_CNAV: + blen, mlen = 137, 900 + elif mode == uNavId.GPS_CNAV2: + blen, mlen = 100, 609 + + k1 = (mlen+7)//8 + + rand_ = bytearray(blen) + rand_[0] = npr.keyid + rand_[1:k1+1] = msg + bs.pack_into('u16', rand_, 8+mlen, npr.salt) + + if self.pk is None: + self.pk = load_pubkey(npr.keyid) + + status = False + try: + self.pk.verify(raw2der(npr.ds), bytes(rand_), + ec.ECDSA(hashes.SHA256())) + status = True + print(f'mode={mode} keyid={npr.keyid} signature OK.') + except InvalidSignature: + print(f'mode={mode} keyid={npr.keyid} signature NG.') + return status diff --git a/src/cssrlib/rawnav.py b/src/cssrlib/rawnav.py index 7a72d2c..2ca0005 100644 --- a/src/cssrlib/rawnav.py +++ b/src/cssrlib/rawnav.py @@ -1610,6 +1610,8 @@ def init_param(self, opt: rcvOpt, prefix=''): self.fh_galfnav = open(prefix+self.file_galfnav, mode='w') if opt.flg_bdsb1c: self.flg_bdsb1c = True + self.file_bdsb1c = "bdsb1c.txt" + self.fh_bdsb1c = open(prefix+self.file_bdsb1c, mode='w') if opt.flg_bdsb2a: self.flg_bdsb2a = True if opt.flg_bdsb2b: diff --git a/src/cssrlib/rinex.py b/src/cssrlib/rinex.py index f64085f..0e274c1 100644 --- a/src/cssrlib/rinex.py +++ b/src/cssrlib/rinex.py @@ -315,7 +315,7 @@ def decode_nav(self, navfile, nav, append=False): geph.pos = pos geph.vel = vel geph.acc = acc - + # Use GLONASS line #4 only from RINEX v3.05 onwards # if self.ver >= 3.05: