Skip to content

Commit

Permalink
Merge pull request #46 from oresat/beacon-script
Browse files Browse the repository at this point in the history
Simple script to print received beacons
  • Loading branch information
ThirteenFish authored Oct 27, 2024
2 parents 024522f + 1e1a232 commit 130a791
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 2 deletions.
57 changes: 57 additions & 0 deletions oresat_c3/protocols/ax25.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Anything dealing with packing AX.25 packets."""

from dataclasses import dataclass

import bitstring

AX25_CALLSIGN_LEN = 6
Expand Down Expand Up @@ -106,3 +108,58 @@ def ax25_pack(
)

return header + payload


@dataclass
class Ax25:
"""Holds the contents of an AX.25 packet"""

dest_callsign: str
dest_ssid: int
src_callsign: str
src_ssid: int
control: int
pid: int
command: bool
response: bool
payload: bytes


def ax25_unpack(raw: bytes) -> Ax25:
"""
Unpacks a AX25 packet.
Parameters
----------
raw: bytes
Raw bytes of an AX25 packet
Returns
-------
AX25
The AX25 fields.
"""
# Unpack AX25 packet header
dest = raw[:AX25_CALLSIGN_LEN]
dest_ssid = raw[AX25_CALLSIGN_LEN]
src = raw[AX25_CALLSIGN_LEN + 1 : 2 * AX25_CALLSIGN_LEN + 1]
src_ssid = raw[2 * AX25_CALLSIGN_LEN + 1]
control = raw[2 * AX25_CALLSIGN_LEN + 2]
pid = raw[2 * AX25_CALLSIGN_LEN + 3]
payload = raw[2 * AX25_CALLSIGN_LEN + 4 :]

# callsigns are bitshifted by 1
dest_callsign = (bitstring.BitArray(dest) >> 1).bytes.decode("ascii").rstrip()
src_callsign = (bitstring.BitArray(src) >> 1).bytes.decode("ascii").rstrip()

return Ax25(
dest_callsign=dest_callsign,
dest_ssid=(dest_ssid & 0x1F) >> 1,
src_callsign=src_callsign,
src_ssid=(src_ssid & 0x1F) >> 1,
control=control,
pid=pid,
command=bool(dest_ssid & 1 << 7),
response=bool(src_ssid & 1 << 7),
payload=payload,
)
66 changes: 66 additions & 0 deletions scripts/beacon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""Listens for and prints C3 beacons"""


import os
import socket
import sys
from argparse import ArgumentParser
from contextlib import suppress
from zlib import crc32

sys.path.insert(0, os.path.abspath(".."))

from oresat_c3.protocols.ax25 import ax25_unpack


def main():
parser = ArgumentParser("Receives and prints beacon packets")
parser.add_argument(
"-o", "--host", default="localhost", help="address to use, default is %(default)s"
)
parser.add_argument(
"-u",
"--beacon-port",
default=10015,
type=int,
help="port to receive beacons on, default is %(default)s",
)
parser.add_argument("-v", "--verbose", action="store_true", help="print out packet hex")
args = parser.parse_args()

host = args.host if args.host in ["localhost", "127.0.0.1"] else ""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((host, args.beacon_port))

print("loop | source->dest | LBand EDL seq rej Batt V1 V2")

loop = 0
while True:
loop += 1
raw = s.recv(4096)
ax = ax25_unpack(raw)
print(f"{loop:4} | {ax.src_callsign}->{ax.dest_callsign}", end="")

crc = int.from_bytes(ax.payload[-4:], "little")
if crc != crc32(ax.payload[:-4]):
print(" | invalid CRC", end="")
elif ax.payload[:3] != bytes("{{z", "ascii"):
print(" | invalid payload header", ax.payload[:3], end="")
else:
lband_rx = int.from_bytes(ax.payload[38:42], "little")
edl_seq = int.from_bytes(ax.payload[53:57], "little")
edl_rej = int.from_bytes(ax.payload[57:61], "little")
vbatt_1 = int.from_bytes(ax.payload[65:67], "little")
vbatt_2 = int.from_bytes(ax.payload[101:103], "little")
print(
f" | {lband_rx:4} rx {edl_seq:6}# {edl_rej:4}× {vbatt_1:6}mV {vbatt_2:6}mV", end=""
)
if args.verbose:
print("\n", raw.hex(), end="")
print()


if __name__ == "__main__":
with suppress(KeyboardInterrupt):
main()
32 changes: 30 additions & 2 deletions tests/protocols/test_ax25.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,39 @@

import unittest

from oresat_c3.protocols.ax25 import AX25_PAYLOAD_MAX_LEN, Ax25Error, ax25_pack
from oresat_c3.protocols.ax25 import AX25_PAYLOAD_MAX_LEN, Ax25, Ax25Error, ax25_pack, ax25_unpack


class TestAx25(unittest.TestCase):
"""Test ax25_pack."""
"""Test ax25_pack and ax_25 unpack"""

def test_ax25_roundtrip(self):
"""Test if unpack can recreate a packed packet"""
src = Ax25(
dest_callsign="DEST",
dest_ssid=1,
src_callsign="SRC",
src_ssid=1,
control=1,
pid=1,
command=True,
response=True,
payload=b"\x01\x02\x03",
)
result = ax25_unpack(
ax25_pack(
dest_callsign=src.dest_callsign,
dest_ssid=src.dest_ssid,
src_callsign=src.src_callsign,
src_ssid=src.src_ssid,
control=src.control,
pid=src.pid,
command=src.command,
response=src.response,
payload=src.payload,
)
)
self.assertEqual(src, result)

def test_ax25_pack_invalid_dest_callsign_length(self):
"""Set destination callsign with a length greater than AX25_CALLSIGN_LEN"""
Expand Down

0 comments on commit 130a791

Please sign in to comment.