Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove asyncio sleep from Worker threads #85

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = tak_pong()
await self.handle_data(data)
await asyncio.sleep(20)
Expand Down
8 changes: 4 additions & 4 deletions examples/send_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = gen_cot()
self._logger.info("Sending:\n%s\n", data.decode())
await self.handle_data(data)
Expand All @@ -60,9 +60,9 @@ async def handle_data(self, data):
"""Handle data from the receive queue."""
self._logger.info("Received:\n%s\n", data.decode())

async def run(self): # pylint: disable=arguments-differ
async def run(self):
"""Read from the receive queue, put data onto handler."""
while 1:
while True:
data = (
await self.queue.get()
) # this is how we get the received CoT from rx_queue
Expand Down
1 change: 0 additions & 1 deletion pytak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
DEFAULT_TLS_PARAMS_REQ,
DEFAULT_HOST_ID,
BOOLEAN_TRUTH,
DEFAULT_MIN_ASYNC_SLEEP,
DEFAULT_XML_DECLARATION,
DEFAULT_IMPORT_OTHER_CONFIGS,
DEFAULT_TAK_PROTO,
Expand Down
59 changes: 23 additions & 36 deletions pytak/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def __init__(
for handler in self._logger.handlers:
handler.setLevel(logging.DEBUG)

self.min_period = pytak.DEFAULT_MIN_ASYNC_SLEEP

tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO)

if tak_proto_version > 0 and takproto is None:
Expand Down Expand Up @@ -104,31 +102,18 @@ async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
pass

async def run(self, number_of_iterations=-1):
"""Run this Thread, reads Data from Queue & passes data to next Handler."""
self._logger.info("Run: %s", self.__class__.__name__)

# We're instantiating the while loop this way, and using get_nowait(),
# to allow unit testing of at least one call of this loop.
while number_of_iterations != 0:
if self.queue.qsize() == 0:
await asyncio.sleep(self.min_period)
continue

# self._logger.debug("TX queue size=%s", self.queue.qsize())
data = None
try:
data = self.queue.get_nowait()
except (asyncio.QueueEmpty, _queue.Empty):
continue

if not data:
continue
async def run_once(self) -> None:
"""Reads Data from Queue & passes data to next Handler."""
data = await self.queue.get()
await self.handle_data(data)
await self.fts_compat()

await self.handle_data(data)
await self.fts_compat()

number_of_iterations -= 1
async def run(self) -> None:
"""Run this Thread - calls run_once() in a loop."""
self._logger.info("Run: %s", self.__class__.__name__)
while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run


class TXWorker(Worker):
Expand Down Expand Up @@ -233,18 +218,20 @@ async def readcot(self):
except asyncio.IncompleteReadError:
return None

async def run(self, number_of_iterations=-1) -> None:
async def run_once(self) -> None:
"""Run this worker once."""
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

async def run(self) -> None:
"""Run this worker."""
self._logger.info("Run: %s", self.__class__.__name__)

while 1:
await asyncio.sleep(self.min_period)
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run

class QueueWorker(Worker):
"""Read non-CoT Messages from an async network client.
Expand Down
5 changes: 0 additions & 5 deletions pytak/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@
BOOLEAN_TRUTH: list = ["true", "yes", "y", "on", "1"]
DEFAULT_COT_VAL: str = "9999999.0"

# await asyncio.sleep(0) should allow co-routines to yield, but they end up
# eating 100% CPU. @PeterQFR found bumping this to 0.1 solved the high CPU
# issue. See: https://github.com/snstac/pytak/pull/22
DEFAULT_MIN_ASYNC_SLEEP: float = 0.1

# TAK Protocol to use for CoT output, one of: 0 (XML, default), 1 (Mesh/Stream).
# Doesn't always work with iTAK. Recommend sticking with 0 (XML).
DEFAULT_TAK_PROTO: str = "0"
Expand Down
4 changes: 2 additions & 2 deletions tests/test_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def test_worker():
await event_queue.put("taco3")
worker: pytak.Worker = pytak.Worker(event_queue)
worker.handle_data = lambda data: event_queue.put(data)
await worker.run(1)
await worker.run_once()
event = await event_queue.get()
assert "taco2" == event

Expand All @@ -106,7 +106,7 @@ async def test_eventworker() -> None:

worker: pytak.Worker = pytak.TXWorker(event_queue, {}, writer)

await worker.run(1)
await worker.run_once()
remaining_event = await event_queue.get()
assert b"taco2" == remaining_event

Expand Down
2 changes: 1 addition & 1 deletion tests/test_pytak.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ async def test_EventWorker(my_queue, my_writer):
test_data = b"test test"
test_eventworker = pytak.TXWorker(my_queue, {}, my_writer)
await my_queue.put(test_data)
await test_eventworker.run(number_of_iterations=1)
await test_eventworker.run_once()

assert my_writer.events.pop() == test_data