Skip to content

Commit

Permalink
Support writing mediums & reasons from ZMQ listener, batch up to 5 op…
Browse files Browse the repository at this point in the history
…erations into each transaction (#67)

* Initial work to take the mediums/reasons from ZMQ, batch them correctly

* Update schemas dep, fix more tests

* Fix remaining tests

* Remove batch_id, log podping id of (timestampNs, sessionId)

* Return dummy transaction on dry-run

* Remove 3speak node

* Don't rush podping batches if the # of operations waiting is 5 or more

* Remove openhive node

* Change num_operations_in_queue logic, remove lock from IRI ingress

* Revert exponent formatting change

* Add unprocessed_iri_queue for backpressure on the zmq socket

* Update long_running_zmq example

* Misc example changes

* Update deps and basic 3.11 support

* Only run poetry build once, since it will be a py3-non-any wheel

* Test on 3.11, only --runslow on one version of python

* Fix typo in get_relevant_transactions_from_blockchain

* Correctly call task_done on unprocessed_iri_queue

* Don't close the plexus if it's passed in externally

* Fix tests

* Put complex type hint in quotes

* Build docker image with Python 3.11

* Remove unnecessary docker packages

* Fix example
  • Loading branch information
agates authored Nov 28, 2022
1 parent a9120cb commit 7162b58
Show file tree
Hide file tree
Showing 35 changed files with 1,256 additions and 710 deletions.
23 changes: 3 additions & 20 deletions .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build and publish to pypi (3.10)
uses: JRubics/poetry-publish@v1.13
- name: Build and publish to pypi
uses: JRubics/poetry-publish@v1.15
with:
python_version: "3.10.4"
python_version: "3.11.0"
ignore_dev_requirements: "yes"
pypi_token: ${{ secrets.PYPI_TOKEN }}
extra_build_dependency_packages: "capnproto libzmq3-dev"
- name: Build and publish to pypi (3.9)
uses: JRubics/[email protected]
with:
python_version: "3.9.13"
build_format: wheel
ignore_dev_requirements: "yes"
pypi_token: ${{ secrets.PYPI_TOKEN }}
extra_build_dependency_packages: "capnproto libzmq3-dev"
- name: Build and publish to pypi (3.8)
uses: JRubics/[email protected]
with:
python_version: "3.8.13"
build_format: wheel
ignore_dev_requirements: "yes"
pypi_token: ${{ secrets.PYPI_TOKEN }}
extra_build_dependency_packages: "capnproto libzmq3-dev"
24 changes: 24 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ jobs:
strategy:
matrix:
python-version: [3.8, 3.9, "3.10"]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt install capnproto
python -m pip install --upgrade pip
pip install poetry
poetry config virtualenvs.create false
poetry install --no-interaction --no-ansi
- name: Test with pytest
env:
PODPING_HIVE_ACCOUNT: ${{ secrets.PODPING_HIVE_ACCOUNT }}
PODPING_HIVE_POSTING_KEY: ${{ secrets.PODPING_HIVE_POSTING_KEY }}
run: |
pytest
test-runslow:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.11" ]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/python:3.10-slim-bullseye
FROM docker.io/python:3.11-slim-bullseye

ENV PYTHONFAULTHANDLER=1 \
PYTHONHASHSEED=random \
Expand Down
118 changes: 72 additions & 46 deletions examples/memory_profile/long_running_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,36 @@
import linecache
import logging
import os
import random
import uuid
from ipaddress import IPv4Address

from plexo.ganglion.tcp_pair import GanglionZmqTcpPair
from plexo.plexus import Plexus
from podping_schemas.org.podcastindex.podping.hivewriter.podping_hive_transaction import (
PodpingHiveTransaction,
)
from podping_schemas.org.podcastindex.podping.podping_medium import PodpingMedium
from podping_schemas.org.podcastindex.podping.podping_reason import PodpingReason
from podping_schemas.org.podcastindex.podping.podping_write import PodpingWrite

from podping_hivewriter.models.medium import mediums
from podping_hivewriter.models.reason import reasons
from podping_hivewriter.neuron import (
podping_hive_transaction_neuron,
podping_write_neuron,
)

try:
import tracemalloc
except ModuleNotFoundError:
tracemalloc = False
import uuid
from random import randint
from platform import python_version as pv, python_implementation as pi
from timeit import default_timer as timer

import zmq
import zmq.asyncio

from podping_hivewriter.constants import LIVETEST_OPERATION_ID
from podping_hivewriter.models.medium import Medium
from podping_hivewriter.models.reason import Reason
from podping_hivewriter.podping_hivewriter import PodpingHivewriter
from podping_hivewriter.podping_settings_manager import PodpingSettingsManager
host = "127.0.0.1"
port = 9979
metrics = {"iris_sent": 0, "ops_received": 0, "iris_received": 0, "txs_received": 0}
txs_received_lock = asyncio.Lock()


def display_top(snapshot, key_type="lineno", limit=3):
Expand Down Expand Up @@ -51,57 +63,71 @@ def display_top(snapshot, key_type="lineno", limit=3):
logging.info("Total allocated size: %.1f KiB" % (total / 1024))


async def endless_send_loop(event_loop):
context = zmq.asyncio.Context()
socket = context.socket(zmq.REQ, io_loop=event_loop)
socket.connect(f"tcp://{host}:{port}")
async def podping_hive_transaction_reaction(transaction: PodpingHiveTransaction, _, _2):
num_iris = sum(len(podping.iris) for podping in transaction.podpings)

async with txs_received_lock:
metrics["ops_received"] = metrics["ops_received"] + len(transaction.podpings)
metrics["iris_received"] = metrics["iris_received"] + num_iris
metrics["txs_received"] = metrics["txs_received"] + 1


async def endless_send_loop():
tcp_pair_ganglion = GanglionZmqTcpPair(
peer=(IPv4Address(host), port),
relevant_neurons=(
podping_hive_transaction_neuron,
podping_write_neuron,
),
)
plexus = Plexus(ganglia=(tcp_pair_ganglion,))
await plexus.adapt(
podping_hive_transaction_neuron,
reactants=(podping_hive_transaction_reaction,),
)
await plexus.adapt(podping_write_neuron)

test_name = "long_running_zmq"
python_version = pv()
python_implementation = pi()
start_time = timer()
diag_time = timer()

while True:
loop_start = timer()
session_uuid = uuid.uuid4()
session_uuid_str = str(session_uuid)

num_iris = randint(1, 10)
for i in range(1000):
iri = f"https://example.com?t=agates_test&i={i}&s={session_uuid_str}"
medium: PodpingMedium = random.sample(sorted(mediums), 1)[0]
reason: PodpingReason = random.sample(sorted(reasons), 1)[0]
podping_write = PodpingWrite(medium=medium, reason=reason, iri=iri)

for i in range(num_iris):
await socket.send_string(
f"https://example.com?t={test_name}&i={i}&v={python_version}&pi={python_implementation}&s={session_uuid_str}"
)
response = await socket.recv_string()
assert response == "OK"
await plexus.transmit(podping_write)

metrics["iris_sent"] = metrics["iris_sent"] + 1000

if tracemalloc and (timer() - start_time) >= 60:
await asyncio.sleep(3 - (timer() - loop_start))
if tracemalloc and (timer() - diag_time) >= 60:
snapshot = tracemalloc.take_snapshot()
display_top(snapshot)
start_time = timer()
await asyncio.sleep(3)
diag_time = timer()
logging.info(
f"IRIs sent: {metrics['iris_sent']} - {metrics['iris_sent'] / (diag_time - start_time)}s"
)
logging.info(
f"TXs received: {metrics['txs_received']} - {metrics['txs_received'] / (diag_time - start_time)}s"
)
logging.info(
f"OPs received: {metrics['ops_received']} - {metrics['ops_received'] / (diag_time - start_time)}s"
)
logging.info(
f"IRIs received: {metrics['iris_received']} - {metrics['iris_received'] / (diag_time - start_time)}s"
)


if __name__ == "__main__":
if tracemalloc:
tracemalloc.start()
loop = asyncio.get_event_loop()
logging.getLogger().setLevel(level=logging.INFO)
settings_manager = PodpingSettingsManager()

host = "127.0.0.1"
port = 9979
podping_hivewriter = PodpingHivewriter(
os.environ["PODPING_HIVE_ACCOUNT"],
[os.environ["PODPING_HIVE_POSTING_KEY"]],
settings_manager,
medium=Medium.podcast,
reason=Reason.update,
listen_ip=host,
listen_port=port,
resource_test=True,
operation_id=LIVETEST_OPERATION_ID,
)
loop.run_until_complete(podping_hivewriter.wait_startup())
loop.run_until_complete(endless_send_loop(loop))

podping_hivewriter.close()
loop.run_until_complete(endless_send_loop())
2 changes: 1 addition & 1 deletion install-packages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ apt-get update
apt-get -y upgrade

# Install application dependencies
apt-get -y install --no-install-recommends capnproto libffi7 libssl1.1 libzmq5 zlib1g gcc libstdc++-10-dev
apt-get -y install --no-install-recommends capnproto libffi7 zlib1g

# Delete cached files we don't need anymore (note that if you're
# using official Docker images for Debian or Ubuntu, this happens
Expand Down
Loading

0 comments on commit 7162b58

Please sign in to comment.