Skip to content

Commit

Permalink
Added SimpleCOTEvent, COTEvent classes and cot2xml function.
Browse files Browse the repository at this point in the history
  • Loading branch information
ampledata committed Sep 18, 2024
1 parent c2a7f38 commit d57bd8a
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 29 deletions.
9 changes: 6 additions & 3 deletions pytak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""Python Team Awareness Kit (PyTAK) Module."""

__version__ = "7.0.2"
__version__ = "7.1.0"

from .constants import ( # NOQA
LOG_LEVEL,
Expand Down Expand Up @@ -57,16 +57,19 @@
RXWorker,
QueueWorker,
CLITool,
SimpleCOTEvent,
COTEvent,
)

from .functions import (
from .functions import ( # NOQA
split_host,
parse_url,
hello_event,
cot_time,
gen_cot,
gen_cot_xml,
) # NOQA
cot2xml,
)

from .client_functions import ( # NOQA
create_udp_client,
Expand Down
96 changes: 83 additions & 13 deletions pytak/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

"""PyTAK Class Definitions."""

import abc
import asyncio
import importlib.util
import ipaddress
import logging
import multiprocessing as mp
Expand All @@ -27,19 +27,20 @@

import xml.etree.ElementTree as ET

from typing import Optional, Set, Union
from dataclasses import dataclass
from typing import Set, Union

from configparser import ConfigParser, SectionProxy

import pytak

try:
import takproto
import takproto # type: ignore
except ImportError:
pass
takproto = None


class Worker: # pylint: disable=too-few-public-methods
class Worker:
"""Meta class for all other Worker Classes."""

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,17 +75,15 @@ def __init__(

tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO)

if tak_proto_version > 0 and importlib.util.find_spec("takproto") is None:
if tak_proto_version > 0 and takproto is None:
self._logger.warning(
"TAK_PROTO is set to '%s', but the 'takproto' Python module is not installed.\n"
"Try: python -m pip install pytak[with_takproto]\n"
"See Also: https://pytak.rtfd.io/en/latest/compatibility/#tak-protocol-payload-version-1-protobuf",
tak_proto_version,
)

self.use_protobuf = tak_proto_version > 0 and importlib.util.find_spec(
"takproto"
)
self.use_protobuf = tak_proto_version > 0 and takproto is not None

async def fts_compat(self) -> None:
"""Apply FreeTAKServer (FTS) compatibility.
Expand All @@ -100,9 +99,10 @@ async def fts_compat(self) -> None:
self._logger.debug("COMPAT: Sleeping for %ss", sleep_period)
await asyncio.sleep(sleep_period)

@abc.abstractmethod
async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
raise NotImplementedError("Subclasses need to override this method")
pass

async def run(self, number_of_iterations=-1):
"""Run this Thread, reads Data from Queue & passes data to next Handler."""
Expand Down Expand Up @@ -131,7 +131,7 @@ async def run(self, number_of_iterations=-1):
number_of_iterations -= 1


class TXWorker(Worker): # pylint: disable=too-few-public-methods
class TXWorker(Worker):
"""Works data queue and hands off to Protocol Workers.
You should create an TXWorker Instance using the `pytak.txworker_factory()`
Expand Down Expand Up @@ -190,7 +190,7 @@ async def send_data(self, data: bytes) -> None:
self.writer.flush()


class RXWorker(Worker): # pylint: disable=too-few-public-methods
class RXWorker(Worker):
"""Async receive (input) queue worker.
Reads events from a `pytak.protocol_factory()` reader and adds them to
Expand All @@ -212,6 +212,11 @@ def __init__(
self.reader: asyncio.Protocol = reader
self.reader_queue = None

@abc.abstractmethod
async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
pass

async def readcot(self):
"""Read CoT from the wire until we hit an event boundary."""
try:
Expand Down Expand Up @@ -241,7 +246,7 @@ async def run(self, number_of_iterations=-1) -> None:
self.queue.put_nowait(data)


class QueueWorker(Worker): # pylint: disable=too-few-public-methods
class QueueWorker(Worker):
"""Read non-CoT Messages from an async network client.
(`asyncio.Protocol` or similar async network client)
Expand All @@ -263,6 +268,11 @@ def __init__(
super().__init__(queue, config)
self._logger.info("Using COT_URL='%s'", self.config.get("COT_URL"))

@abc.abstractmethod
async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
pass

async def put_queue(
self, data: bytes, queue_arg: Union[asyncio.Queue, mp.Queue, None] = None
) -> None:
Expand Down Expand Up @@ -408,3 +418,63 @@ async def run(self):

for task in done:
self._logger.info("Complete: %s", task)


@dataclass
class SimpleCOTEvent:
"""CoT Event Dataclass."""

lat: Union[bytes, str, float, None] = None
lon: Union[bytes, str, float, None] = None
uid: Union[str, None] = None
stale: Union[float, int, None] = None
cot_type: Union[str, None] = None

def __str__(self) -> str:
"""Return a formatted string representation of the dataclass."""
event = self.to_xml()
return ET.tostring(event, encoding="unicode")

def to_bytes(self) -> bytes:
"""Return the class as bytes."""
event = self.to_xml()
return ET.tostring(event, encoding="utf-8")

def to_xml(self) -> ET.Element:
"""Return a CoT Event as an XML string."""
cotevent = COTEvent(
lat=self.lat,
lon=self.lon,
uid=self.uid,
stale=self.stale,
cot_type=self.cot_type,
le=pytak.DEFAULT_COT_VAL,
ce=pytak.DEFAULT_COT_VAL,
hae=pytak.DEFAULT_COT_VAL,
)
event = pytak.cot2xml(cotevent)
return event


@dataclass
class COTEvent(SimpleCOTEvent):
"""COT Event Dataclass."""

ce: Union[bytes, str, float, int, None] = None
hae: Union[bytes, str, float, int, None] = None
le: Union[bytes, str, float, int, None] = None

def to_xml(self) -> ET.Element:
"""Return a CoT Event as an XML string."""
cotevent = COTEvent(
lat=self.lat,
lon=self.lon,
uid=self.uid,
stale=self.stale,
cot_type=self.cot_type,
le=self.le,
ce=self.ce,
hae=self.hae,
)
event = pytak.cot2xml(cotevent)
return event
6 changes: 3 additions & 3 deletions pytak/crypto_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

USE_CRYPTOGRAPHY = True
except ImportError as exc:
warnings.warn(exc)
warnings.warn(str(exc))


def save_pem(pem: bytes, dest: Union[str, None] = None) -> str:
Expand All @@ -60,7 +60,7 @@ def load_cert(
): # -> Set[_RSAPrivateKey, Certificate, Certificate]:
"""Load RSA Keys & Certs from a pkcs12 ().p12) file."""
if not USE_CRYPTOGRAPHY:
raise Exception(INSTALL_MSG)
raise ValueError(INSTALL_MSG)

with open(cert_path, "br+") as cp_fd:
p12_data = cp_fd.read()
Expand All @@ -73,7 +73,7 @@ def load_cert(
def convert_cert(cert_path: str, cert_pass: str) -> dict:
"""Convert a P12 cert to PEM."""
if not USE_CRYPTOGRAPHY:
raise Exception(INSTALL_MSG)
raise ValueError(INSTALL_MSG)

cert_paths = {
"pk_pem_path": None,
Expand Down
47 changes: 44 additions & 3 deletions pytak/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,65 @@ def connectString2url(conn_str: str) -> str: # pylint: disable=invalid-name
return f"{uri_parts[2]}://{uri_parts[0]}:{uri_parts[1]}"


def cot2xml(event: pytak.COTEvent) -> ET.Element:
"""Generate a minimum COT Event as an XML object."""
lat = str(event.lat or "0.0")
lon = str(event.lon or "0.0")
uid = event.uid or pytak.DEFAULT_HOST_ID
stale = int(event.stale or pytak.DEFAULT_COT_STALE)
cot_type = event.cot_type or "a-u-G"
le = str(event.le or pytak.DEFAULT_COT_VAL)
hae = str(event.hae or pytak.DEFAULT_COT_VAL)
ce = str(event.ce or pytak.DEFAULT_COT_VAL)

xevent = ET.Element("event")
xevent.set("version", "2.0")
xevent.set("type", cot_type)
xevent.set("uid", uid)
xevent.set("how", "m-g")
xevent.set("time", pytak.cot_time())
xevent.set("start", pytak.cot_time())
xevent.set("stale", pytak.cot_time(stale))

point = ET.Element("point")
point.set("lat", lat)
point.set("lon", lon)
point.set("le", le)
point.set("hae", hae)
point.set("ce", ce)

flow_tags = ET.Element("_flow-tags_")
_ft_tag: str = f"{pytak.DEFAULT_HOST_ID}-v{pytak.__version__}".replace("@", "-")
flow_tags.set(_ft_tag, pytak.cot_time())

detail = ET.Element("detail")
detail.append(flow_tags)

xevent.append(point)
xevent.append(detail)

return xevent


def gen_cot_xml(
lat: Union[bytes, str, float, None] = None,
lon: Union[bytes, str, float, None] = None,
ce: Union[bytes, str, float, int, None] = None,
hae: Union[bytes, str, float, int, None] = None,
le: Union[bytes, str, float, int, None] = None,
uid: Union[bytes, str, None] = None,
uid: Union[str, None] = None,
stale: Union[float, int, None] = None,
cot_type: Union[bytes, str, None] = None,
cot_type: Union[str, None] = None,
) -> Optional[ET.Element]:
"""Generate a minimum CoT Event as an XML object."""

lat = str(lat or "0.0")
lon = str(lon or "0.0")
ce = str(ce or pytak.DEFAULT_COT_VAL)
hae = str(hae or pytak.DEFAULT_COT_VAL)
le = str(le or pytak.DEFAULT_COT_VAL)
uid = uid or pytak.DEFAULT_HOST_ID
stale = stale or pytak.DEFAULT_COT_STALE
stale = int(stale or pytak.DEFAULT_COT_STALE)
cot_type = cot_type or "a-u-G"

event = ET.Element("event")
Expand Down
Loading

0 comments on commit d57bd8a

Please sign in to comment.