diff --git a/receiver/decode_sbf.py b/receiver/decode_sbf.py index 9dd32ee..64315fb 100644 --- a/receiver/decode_sbf.py +++ b/receiver/decode_sbf.py @@ -645,7 +645,8 @@ def decode(self, buff, len_, sys=[], prn=[]): self.fh_galfnav.write("\n") - eph = self.rn.decode_gal_fnav(self.week, self.tow, sat, 1, msg) + eph = self.rn.decode_gal_fnav(self.week, self.tow, sat, type_, + msg) if eph is not None: self.re.rnx_nav_body(eph, self.fh_rnxnav) @@ -696,7 +697,7 @@ def decode(self, buff, len_, sys=[], prn=[]): self.fh_galinav.write("\n") eph = self.rn.decode_gal_inav(self.week, self.tow, - sat, 2, msg_) + sat, type_, msg_) if self.mode_galinav == 0 and eph is not None: self.re.rnx_nav_body(eph, self.fh_rnxnav) diff --git a/receiver/decode_ubx.py b/receiver/decode_ubx.py new file mode 100644 index 0000000..a54440a --- /dev/null +++ b/receiver/decode_ubx.py @@ -0,0 +1,463 @@ +""" +u-blox Receiver UBX messages decoder + + [1] u-blox F9 HPG 1.51, u-blox F9 high precision GNSS receiver + Interface description, 2024 + +@author Rui Hirokawa +""" + +from glob import glob +import numpy as np +import os +import struct as st +import bitstruct.c as bs +from cssrlib.gnss import uGNSS, uTYP, prn2sat, Obs, rSigRnx, gpst2time, uSIG +from cssrlib.rawnav import rcvDec, rcvOpt +from binascii import hexlify + +CPSTD_VALID = 5 # stdev threshold of valid carrier-phase + + +class ubx(rcvDec): + """ class for u-blox Binary Format (UBX) decoder """ + tow = -1 + week = -1 + + def __init__(self, opt=None, prefix='', gnss_t='GECJ'): + super().__init__(opt, prefix, gnss_t) + + sig_tbl = { + uGNSS.GPS: {0: uSIG.L1C, 3: uSIG.L2L, 4: uSIG.L2S, 6: uSIG.L5I, + 7: uSIG.L5Q}, + uGNSS.SBS: {0: uSIG.L1C}, + uGNSS.GAL: {0: uSIG.L1C, 1: uSIG.L1B, 3: uSIG.L5I, 4: uSIG.L5Q, + 5: uSIG.L7I, 6: uSIG.L7Q, 8: uSIG.L6B, 9: uSIG.L6C, + 10: uSIG.L6A}, + uGNSS.BDS: {0: uSIG.L2I, 1: uSIG.L2I, 2: uSIG.L7I, 3: uSIG.L7I, + 4: uSIG.L6I, 10: uSIG.L6I, 5: uSIG.L1P, 6: uSIG.L1D, + 7: uSIG.L5P, 8: uSIG.L5D}, + uGNSS.QZS: {0: uSIG.L1C, 1: uSIG.L1Z, 4: uSIG.L2S, 5: uSIG.L2L, + 8: uSIG.L5I, 9: uSIG.L5Q}, + uGNSS.GLO: {0: uSIG.L1C, 2: uSIG.L2C}, + uGNSS.IRN: {0: uSIG.L5A} + } + + obs_tbl = { + uGNSS.GPS: [uSIG.L1C, uSIG.L2L, uSIG.L5Q], + uGNSS.SBS: [uSIG.L1C], + uGNSS.GAL: [uSIG.L1C, uSIG.L5Q, uSIG.L7Q, uSIG.L6B], + uGNSS.BDS: [uSIG.L1P, uSIG.L5P, uSIG.L6I], + uGNSS.QZS: [uSIG.L1C, uSIG.L2L, uSIG.L5Q], + uGNSS.GLO: [uSIG.L1C, uSIG.L2C], + uGNSS.IRN: [uSIG.L5A], + } + + self.sig_t = {} + for sys in sig_tbl.keys(): + if sys not in self.sig_tab.keys(): + continue + + self.sig_t[sys] = {} + for key in sig_tbl[sys].keys(): + sig = sig_tbl[sys][key] + self.sig_t[sys][key] = { + uTYP.C: rSigRnx(sys, uTYP.C, sig), + uTYP.L: rSigRnx(sys, uTYP.L, sig), + uTYP.D: rSigRnx(sys, uTYP.D, sig), + uTYP.S: rSigRnx(sys, uTYP.S, sig), + } + + for sys in sig_tbl.keys(): + if sys not in self.sig_tab.keys(): + continue + self.sig_tab[sys][uTYP.C] = [] + self.sig_tab[sys][uTYP.L] = [] + self.sig_tab[sys][uTYP.D] = [] + self.sig_tab[sys][uTYP.S] = [] + for sig in obs_tbl[sys]: + self.sig_tab[sys][uTYP.C].append(rSigRnx(sys, uTYP.C, sig)) + self.sig_tab[sys][uTYP.L].append(rSigRnx(sys, uTYP.L, sig)) + self.sig_tab[sys][uTYP.D].append(rSigRnx(sys, uTYP.D, sig)) + self.sig_tab[sys][uTYP.S].append(rSigRnx(sys, uTYP.S, sig)) + + self.bds_cnv1 = {} + for k in range(uGNSS.BDSMAX): + self.bds_cnv1[k] = bytearray(124) + + def sync(self, buff, k): + return buff[k] == 0xB5 and buff[k+1] == 0x62 + + def msg_len(self, msg, k): + return st.unpack_from(' 0: + print("checksum error.") + return False + return True + + def svid2prn(self, gnss, svid): + gnss_t = {0: uGNSS.GPS, 1: uGNSS.SBS, 2: uGNSS.GAL, 3: uGNSS.BDS, + 5: uGNSS.QZS, 6: uGNSS.GLO, 7: uGNSS.IRN} + sys = gnss_t[gnss] + if sys == uGNSS.QZS: + prn = svid + 192 + else: + prn = svid + return sys, prn + + def decode_obs(self, buff, k=6): + obs = Obs() + obs.sig = self.sig_tab + + nsig_max = 0 + for s in self.sig_tab: + if len(self.sig_tab[s][uTYP.L]) > nsig_max: + nsig_max = len(self.sig_tab[s][uTYP.L]) + + self.nsig[uTYP.C] = nsig_max + self.nsig[uTYP.L] = nsig_max + self.nsig[uTYP.D] = nsig_max + self.nsig[uTYP.S] = nsig_max + + obs.sat = np.empty(0, dtype=np.int32) + + tow, wn, leapS, nm, stat, ver = st.unpack_from(' CPSTD_VALID: + cp_ = 0.0 + + sys, prn = self.svid2prn(gnss, svid) + + if sys not in self.sig_tab: + continue + + if sys == uGNSS.GLO and svid == 255: + continue + + # ch = freqid-7 if sys == uGNSS.GLO else 0 + sat = prn2sat(sys, prn) + sig = self.sig_t[sys][sigid][uTYP.L] + + if sig not in self.sig_tab[sys][sig.typ]: + if self.monlevel > 1: + print("skip code={:}".format(sig)) + continue + idx = self.sig_tab[sys][sig.typ].index(sig) + + if sat not in pr.keys(): + pr[sat] = {} + cp[sat] = {} + dp[sat] = {} + lli[sat] = {} + cn[sat] = {} + + slip = 0x01 if locktime == 0 else 0 + halfv = 0x02 if (trk & 4) != 0 else 0 + halfc = 0x80 if (trk & 8) != 0 else 0 + + lli_ = slip + halfv + halfc + + pr[sat][idx] = pr_ + cp[sat][idx] = cp_ + dp[sat][idx] = dop_ + lli[sat][idx] = lli_ + cn[sat][idx] = cn0 + + if sat not in obs.sat: + obs.sat = np.append(obs.sat, sat) + + # print(f"OBS {gnss}:{prn:3d} {svid} {sigid:2d}") + + nsat = len(obs.sat) + if nsat == 0: + return None + + obs.sat.sort() + obs.P = np.zeros((nsat, self.nsig[uTYP.C]), dtype=np.float64) + obs.L = np.zeros((nsat, self.nsig[uTYP.L]), dtype=np.float64) + obs.D = np.zeros((nsat, self.nsig[uTYP.D]), dtype=np.float64) + obs.S = np.zeros((nsat, self.nsig[uTYP.S]), dtype=np.float64) + obs.lli = np.zeros((nsat, self.nsig[uTYP.L]), dtype=np.int32) + + for k, sat in enumerate(obs.sat): + for i in pr[sat].keys(): + obs.P[k][i] = pr[sat][i] + obs.L[k][i] = cp[sat][i] + obs.D[k][i] = dp[sat][i] + obs.S[k][i] = cn[sat][i] + obs.lli[k][i] = lli[sat][i] + + return obs + + def decode_nav(self, buff, k=6): + if self.week == -1: + return None + + gnss, svid, sigid, freqid, len_, ch, ver = \ + st.unpack_from(''+len_*'L', buff, 14) + b = bytes(np.array(msg, dtype='uint32')) + fh_ = None + eph = None + geph = None + type_ = sigid + blen = len_*4 + if sys == uGNSS.GPS: + if sigid == 0 and self.flg_gpslnav: # L1C/A + fh_ = self.fh_gpslnav + type_ = 0 + eph = self.rn.decode_gps_lnav(self.week, self.tow, sat, b) + elif sigid == 4 and self.flg_gpscnav: # L2M + fh_ = self.fh_gpscnav + type_ = 1 + # eph = self.rn.decode_gps_cnav(self.week, self.tow, sat, b) + elif sigid == 6 and self.flg_gpscnav: # L5I + fh_ = self.fh_gpscnav + type_ = 2 + eph = self.rn.decode_gps_cnav(self.week, self.tow, sat, b) + elif sys == uGNSS.GAL: + if sigid == 1 or sigid == 3: # I/NAV + b_ = bytearray(b) + b_[15:30] = b_[16:31] # even 128bit, odd 128bit -> skip 8bit + b_[30] = 0 + b = bytes(b_) + + if sigid == 1 and self.flg_galinav: # E1B + fh_ = self.fh_galinav + type_ = 0 + eph = self.rn.decode_gal_inav( + self.week, self.tow, sat, type_, b) + elif sigid == 3 and self.flg_galfnav: # E5aI + fh_ = self.fh_galfnav + type_ = 1 + eph = self.rn.decode_gal_fnav( + self.week, self.tow, sat, type_, b) + elif sigid == 5 and self.flg_galinav: # E5bI + fh_ = self.fh_galinav + type_ = 2 + eph = self.rn.decode_gal_inav( + self.week, self.tow, sat, type_, b) + elif sigid == 8 and self.flg_galcnav: # E6B + fh_ = self.fh_galcnav + type_ = 6 + elif sys == uGNSS.BDS: + if sigid == 4 and self.flg_bdsd12: # B3I D1 + type_ = 2 + elif sigid == 6 and self.flg_bdsb1c: # B1Cd + fh_ = self.fh_bdsb1c + type_ = 3 + + if blen == 36: + self.bds_cnv1[prn][76:109] = b[:33] + self.bds_cnv1[prn][123] = 1 + blen = 0 + elif blen == 76: + self.bds_cnv1[prn][:76] = b[:76] + self.bds_cnv1[prn][123] |= 2 + blen = 0 + elif blen == 12: + prn_, soh = bs.unpack_from('u6u8', b, 0) + self.bds_cnv1[prn][109] = soh + b = bytes(self.bds_cnv1[prn]) + self.bds_cnv1[prn][123] |= 4 + blen = 110 + if self.bds_cnv1[prn][123] == 7: + eph = self.rn.decode_bds_b1c( + self.week, self.tow, sat, b) + elif sigid == 8 and self.flg_bdsb2a: # B2ad + type_ = 5 + elif sys == uGNSS.GLO: + if sigid == 0 and self.flg_gloca: # L1 OF + type_ = 0 + geph = self.rn.decode_glo_fdma(self.week, self.tow, + sat, b, freqid) + elif sigid == 2 and self.flg_gloca: # L2 OF + type_ = 2 + geph = self.rn.decode_glo_fdma(self.week, self.tow, + sat, b, freqid) + elif sys == uGNSS.SBS: + if sigid == 0 and self.flg_sbas: # L1C/A + fh_ = self.fh_sbas + type_ = 0 + elif sys == uGNSS.QZS: + if sigid == 0 and self.flg_qzslnav: # L1C/A + fh_ = self.fh_qzslnav + type_ = 0 + eph = self.rn.decode_gps_lnav(self.week, self.tow, sat, b) + elif sigid == 4 and self.flg_qzscnav: # L2C + fh_ = self.fh_qzscnav + type_ = 1 + eph = self.rn.decode_gps_cnav(self.week, self.tow, sat, b) + elif sigid == 8 and self.flg_qzscnav: # L5I + fh_ = self.fh_qzscnav + type_ = 2 + eph = self.rn.decode_gps_cnav(self.week, self.tow, sat, b) + + if self.flg_rnxnav: + if eph is not None: + self.re.rnx_nav_body(eph, self.fh_rnxnav) + if geph is not None: + self.re.rnx_gnav_body(geph, self.fh_rnxnav) + + if fh_ is not None and blen > 0: + fh_.write("{:4d}\t{:6d}\t{:3d}\t{:1d}\t{:3d}\t{:s}\n". + format(self.week, int(self.tow+0.01), prn, type_, blen, + hexlify(b[:blen]).decode())) + if self.monlevel > 0: + print(f"NAV gnss={gnss}:prn={svid:3d}({freqid:2d}):sig={sigid:2d}") + + def decode_l6msg(self, buff, k=6): + ver, svid, cno, timetag, gd, nc, ch = st.unpack_from( + 'BBHLBBH', buff, k) + k += 14 + b = buff[k:k+250] + print(f"L6 {svid}:{ch}") + if self.flg_qzsl6: + self.fh_qzsl6.write("{:4d}\t{:6d}\t{:3d}\t{:1d}\t{:3d}\t{:s}\n". + format(self.week, self.tow, svid, ch, len_*4, + hexlify(b).decode())) + + def decode_timegps(self, buff, k=6): + """ decode NAV-TIMEGPS """ + itow, ftow, self.week, leaps, valid, tacc = \ + st.unpack_from('= maxlen: + break + + if not ubxdec.check_crc(msg, k): + k += 1 + continue + + ubxdec.decode(msg[k:k+len_], len_) + k += len_ + + nep += 1 + if nep_max > 0 and nep >= nep_max: + break + + ubxdec.file_close()