Skip to content

Commit

Permalink
Refactor OpenSky API interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
sco1 committed Nov 1, 2023
1 parent 7aaf1d0 commit 2eaf874
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 73 deletions.
24 changes: 9 additions & 15 deletions code.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time

import adafruit_datetime as dt
import adafruit_requests as requests
import board
import displayio
import terminalio
Expand All @@ -12,7 +11,7 @@

from skyportal.aircraftlib import AIRCRAFT_ICONS, AircraftIcon, AircraftState, BASE_ICON
from skyportal.maplib import build_bounding_box, calculate_pixel_position, get_base_map
from skyportal.networklib import build_opensky_request, parse_opensky_response, query_opensky
from skyportal.opensky import APIException, APITimeoutError, OpenSky

REFRESH_INTERVAL_SECONDS = 30

Expand Down Expand Up @@ -144,27 +143,22 @@ def redraw_aircraft(
time_label = add_time_label()
MAIN_DISPLAY_GROUP.append(AIRCRAFT_GROUP)

opensky_header, opensky_url = build_opensky_request(*grid_bounds)
opensky_handler = OpenSky(grid_bounds=grid_bounds)
print(f"\n{'='*40}\nInitialization complete\n{'='*40}\n")

# Main loop
while True:
aircraft: list[AircraftState] = []
try:
print("Requesting aircraft data from OpenSky")
flight_data = query_opensky(header=opensky_header, url=opensky_url)
print("Parsing OpenSky API response")
aircraft, api_time = parse_opensky_response(flight_data)
print(f"Found {len(aircraft)} aircraft")
except RuntimeError as e:
print("Error retrieving flight data from OpenSky", e)
except (requests.OutOfRetries, TimeoutError):
opensky_handler.update()
except APITimeoutError:
print("Request to OpenSky timed out")
except APIException as e:
print(e)

if aircraft:
if opensky_handler.can_draw():
print("Updating aircraft locations")
redraw_aircraft(aircraft)
time_label.text = f"{api_time}Z"
redraw_aircraft(opensky_handler.aircraft)
time_label.text = f"{opensky_handler.api_time}Z"
else:
print("No aircraft to draw, skipping redraw")

Expand Down
22 changes: 22 additions & 0 deletions skyportal/aircraftlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AircraftCategory: # noqa: D101

class AircraftState: # noqa: D101
icao: str
callsign: str | None
lat: float | None
lon: float | None
track: float | None
Expand All @@ -41,7 +42,9 @@ class AircraftState: # noqa: D101
aircraft_category: int

def __init__(self, state_vector: dict) -> None:
# See: https://openskynetwork.github.io/opensky-api/rest.html#id4 for field descriptions
self.icao = state_vector[0]
self.callsign = state_vector[1]
self.lat = state_vector[6]
self.lon = state_vector[5]
self.track = state_vector[10]
Expand All @@ -52,6 +55,25 @@ def __init__(self, state_vector: dict) -> None:
self.vertical_rate_mps = state_vector[11]
self.aircraft_category = state_vector[17]

if state_vector[1] is not None:
self.callsign = state_vector[1].strip()

def __str__(self) -> str:
if self.callsign is not None:
ac_id = self.callsign
else:
ac_id = self.icao

if self.is_plottable():
track_str = f"({self.lat:0.3f}, {self.lon:0.3f}), {int(self.track)}º" # type: ignore[arg-type] # noqa: E501
else:
track_str = "No track information"

if self.geo_altitude_m is not None:
track_str = f"{track_str} @ {int(self.geo_altitude_m)}m MSL"

return f"{ac_id}: {track_str}"

def is_plottable(self) -> bool: # noqa: D102
if self.lat is None:
return False
Expand Down
58 changes: 0 additions & 58 deletions skyportal/networklib.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,9 @@
import adafruit_datetime as dt
import adafruit_requests as requests
from circuitpython_base64 import b64encode

from skyportal.aircraftlib import AircraftState

# CircuitPython doesn't have the typing module, so throw this away at runtime
try:
import typing as t
except ImportError:
pass

try:
from secrets import secrets
except ImportError as e:
raise Exception("Could not locate secrets file.") from e

OPENSKY_URL_BASE = "https://opensky-network.org/api/states/all"


def build_url(base: str, params: dict[str, t.Any]) -> str:
"""Build a url from the provided base & parameter(s)."""
Expand All @@ -34,48 +21,3 @@ def urlencode(url: str) -> str:
encoded_chars.append(f"%{ord(c):02X}")

return "".join(encoded_chars)


def build_opensky_request(
lat_min: float,
lat_max: float,
lon_min: float,
lon_max: float,
) -> tuple[dict[str, str], str]:
"""Build the OpenSky API authorization header & request URL for the desired location."""
opensky_params = {
"lamin": lat_min,
"lamax": lat_max,
"lomin": lon_min,
"lomax": lon_max,
"extended": 1,
}
opensky_url = build_url(OPENSKY_URL_BASE, opensky_params)

opensky_auth = f"{secrets['opensky_username']}:{secrets['opensky_password']}"
auth_token = b64encode(opensky_auth.encode("utf-8")).decode("ascii")
opensky_header = {"Authorization": f"Basic {auth_token}"}

return opensky_header, opensky_url


def parse_opensky_response(opensky_json: dict) -> tuple[list[AircraftState], str]:
"""
Parse the OpenSky API response into a list of aircraft states, along with the UTC timestamp.
See: https://openskynetwork.github.io/opensky-api/rest.html#id4 for state vector information.
"""
api_time = str(dt.datetime.fromtimestamp(opensky_json["time"]))
return [AircraftState(state_vector) for state_vector in opensky_json["states"]], api_time


def query_opensky(header: dict[str, str], url: str) -> dict[str, t.Any]: # noqa: D103
r = requests.get(url=url, headers=header)
if r.status_code != 200:
raise RuntimeError(f"Bad response received from OpenSky: {r.status_code}, {r.text}")

aircraft_data = r.json()
if aircraft_data is None:
raise RuntimeError("Empty response received from OpenSky")

return r.json() # type: ignore[no-any-return]
106 changes: 106 additions & 0 deletions skyportal/opensky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import adafruit_datetime as dt
import adafruit_requests as requests
from circuitpython_base64 import b64encode

from skyportal.aircraftlib import AircraftState
from skyportal.networklib import build_url

try:
from secrets import secrets
except ImportError as e:
raise Exception("Could not locate secrets file.") from e

# CircuitPython doesn't have the typing module, so throw this away at runtime
try:
import typing as t
except ImportError:
pass

OPENSKY_URL_BASE = "https://opensky-network.org/api/states/all"


class APITimeoutError(TimeoutError): # noqa: D101
pass


class APIException(RuntimeError): # noqa: D101
pass


def _build_opensky_request(
lat_min: float,
lat_max: float,
lon_min: float,
lon_max: float,
) -> tuple[dict[str, str], str]:
"""Build the OpenSky API authorization header & request URL for the desired location."""
opensky_params = {
"lamin": lat_min,
"lamax": lat_max,
"lomin": lon_min,
"lomax": lon_max,
"extended": 1,
}
opensky_url = build_url(OPENSKY_URL_BASE, opensky_params)

opensky_auth = f"{secrets['opensky_username']}:{secrets['opensky_password']}"
auth_token = b64encode(opensky_auth.encode("utf-8")).decode("ascii")
opensky_header = {"Authorization": f"Basic {auth_token}"}

return opensky_header, opensky_url


def _parse_opensky_response(opensky_json: dict) -> tuple[list[AircraftState], str]:
"""
Parse the OpenSky API response into a list of aircraft states, along with the UTC timestamp.
See: https://openskynetwork.github.io/opensky-api/rest.html#id4 for state vector information.
"""
api_time = str(dt.datetime.fromtimestamp(opensky_json["time"]))
return [AircraftState(state_vector) for state_vector in opensky_json["states"]], api_time


def _query_opensky(header: dict[str, str], url: str) -> dict[str, t.Any]: # noqa: D103
r = requests.get(url=url, headers=header)
if r.status_code != 200:
raise RuntimeError(f"Bad response received from OpenSky: {r.status_code}, {r.text}")

aircraft_data = r.json()
if aircraft_data is None:
raise RuntimeError("Empty response received from OpenSky")

return r.json() # type: ignore[no-any-return]


class OpenSky:
"""Handler for OpenSky API interactions."""

_header: dict[str, str]
_url: str

aircraft: list[AircraftState]
api_time: str

def __init__(self, grid_bounds: tuple[float, float, float, float]) -> None:
self._header, self._url = _build_opensky_request(*grid_bounds)
self.aircraft = []
self.api_time = ""

def can_draw(self) -> bool: # noqa: D102
return bool(len(self.aircraft))

def update(self) -> None:
"""Aircraft state vector update loop."""
try:
print("Requesting aircraft data from OpenSky")
flight_data = _query_opensky(header=self._header, url=self._url)

print("Parsing OpenSky API response")
self.aircraft, self.api_time = _parse_opensky_response(flight_data)
except RuntimeError as e:
# Clean this up (e.g. the 502 error most commonly seen dumps a full HTML page)
raise APIException("Error retrieving flight data from OpenSky") from e
except (requests.OutOfRetries, TimeoutError):
raise APITimeoutError

print(f"Found {len(self.aircraft)} aircraft")

0 comments on commit 2eaf874

Please sign in to comment.