Skip to content
This repository has been archived by the owner on Jan 18, 2025. It is now read-only.

Commit

Permalink
refactor(send_messages): using async instead of threading for better …
Browse files Browse the repository at this point in the history
…concurent
  • Loading branch information
NTGNguyen committed Dec 28, 2024
1 parent 3827bfe commit 2e0c41c
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions src/check_phat_nguoi/modules/notify/telegram.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from logging import getLogger
from threading import Thread
from typing import LiteralString

import requests
from aiohttp import ClientConnectionError, ClientSession, ClientTimeout

from check_phat_nguoi.config import TelegramNotifyDTO
from check_phat_nguoi.utils.constants import SEND_MESSAGE_API_URL_TELEGRAM as API_URL
Expand All @@ -18,8 +18,10 @@ def __init__(
):
self._telegram_notify_object: TelegramNotifyDTO = telegram_notify
self._message_dict: dict[str, LiteralString] = message_dict
self.session = ClientSession()
self.timeout = 10

def _send_message(self, message: LiteralString, timeout=10) -> None:
async def _send_message(self, message: LiteralString) -> None:
if not self._telegram_notify_object.enabled:
logger.info("Not enable to sending")
return
Expand All @@ -30,24 +32,37 @@ def _send_message(self, message: LiteralString, timeout=10) -> None:
"parse_mode": "Markdown",
}
try:
response = requests.post(url, json=payload)
response.raise_for_status()
logger.info(f"Request successful: {response.status_code}")
except requests.exceptions.ConnectionError:
logger.error(f"Unable to connect to {url}")
except requests.exceptions.Timeout:
logger.error(f"Time out of {timeout} seconds from URL {url}")

def mutithread_send_message(self) -> None:
threads: list[Thread] = []
for _, message in self._message_dict.items():
thread = Thread(target=self._send_message, args=(message))
threads.append(thread)
thread.start()
for idx, thread in enumerate(threads, start=1):
try:
thread.join()
except Exception:
logger.error(
f"An error occurs while sending message in thread number {idx}"
async with self.session.post(
url, json=payload, timeout=ClientTimeout(self.timeout)
) as response:
response.raise_for_status()
logger.info(
"Sending message completed for chat_id:{chat_id} and bot_token:{bot_token}".format(
chat_id=self._telegram_notify_object.telegram.chat_id,
bot_token=self._telegram_notify_object.telegram.bot_token,
)
)
except asyncio.TimeoutError:
logger.error(
"Time out of {self.timeout} seconds for chat_id:{chat_id} and bot_token:{bot_token}".format(
chat_id=self._telegram_notify_object.telegram.chat_id,
bot_token=self._telegram_notify_object.telegram.bot_token,
)
)
except ClientConnectionError:
logger.error(
"Unable to sending message for chat_id:{chat_id} and bot_token:{bot_token}".format(
chat_id=self._telegram_notify_object.telegram.chat_id,
bot_token=self._telegram_notify_object.telegram.bot_token,
)
)

async def send_messages(self) -> None:
async def _concurent_send_messages():
tasks = [
self._send_message(message) for _, message in self._message_dict.items()
]
await asyncio.gather(*tasks)

await _concurent_send_messages()
await self.session.close()

0 comments on commit 2e0c41c

Please sign in to comment.