Skip to content

Commit

Permalink
tgram:// support for topic values as target arguments (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
caronc authored Dec 28, 2023
1 parent 9dcf769 commit d6e0d2e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 52 deletions.
120 changes: 72 additions & 48 deletions apprise/plugins/NotifyTelegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@
# Chat ID is required
# If the Chat ID is positive, then it's addressed to a single person
# If the Chat ID is negative, then it's targeting a group
# We can support :topic (an integer) if specified as well
IS_CHAT_ID_RE = re.compile(
r'^(@*(?P<idno>-?[0-9]{1,32})|(?P<name>[a-z_-][a-z0-9_-]+))$',
r'^((?P<idno>-?[0-9]{1,32})|(@|%40)?(?P<name>[a-z_-][a-z0-9_-]+))'
r'((:|%3A)(?P<topic>[0-9]+))?$',
re.IGNORECASE,
)

Expand Down Expand Up @@ -360,9 +362,6 @@ def __init__(self, bot_token, targets, detect_owner=True,
self.logger.warning(err)
raise TypeError(err)

# Parse our list
self.targets = parse_list(targets)

# Define whether or not we should make audible alarms
self.silent = self.template_args['silent']['default'] \
if silent is None else bool(silent)
Expand Down Expand Up @@ -403,15 +402,41 @@ def __init__(self, bot_token, targets, detect_owner=True,
# URL later to directly include the user that we should message.
self.detect_owner = detect_owner

if self.user:
# Treat this as a channel too
self.targets.append(self.user)
# Parse our list
self.targets = []
for target in parse_list(targets):
results = IS_CHAT_ID_RE.match(target)
if not results:
self.logger.warning(
'Dropped invalid Telegram chat/group ({}) specified.'
.format(target),
)

# Ensure we don't fall back to owner detection
self.detect_owner = False
continue

try:
topic = int(
results.group('topic')
if results.group('topic') else self.topic)

except TypeError:
# No worries
topic = None

if results.group('name') is not None:
# Name
self.targets.append(('@%s' % results.group('name'), topic))

else: # ID
self.targets.append((int(results.group('idno')), topic))

# Track whether or not we want to send an image with our notification
# or not.
self.include_image = include_image

def send_media(self, chat_id, notify_type, attach=None):
def send_media(self, target, notify_type, attach=None):
"""
Sends a sticker based on the specified notify type
Expand Down Expand Up @@ -470,9 +495,12 @@ def send_media(self, chat_id, notify_type, attach=None):
# content can arrive together.
self.throttle()

# Extract our target
chat_id, topic = target

payload = {'chat_id': chat_id}
if self.topic:
payload['message_thread_id'] = self.topic
if topic:
payload['message_thread_id'] = topic

try:
with open(path, 'rb') as f:
Expand Down Expand Up @@ -658,7 +686,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
_id = self.detect_bot_owner()
if _id:
# Permanently store our id in our target list for next time
self.targets.append(str(_id))
self.targets.append((str(_id), None))
self.logger.info(
'Update your Telegram Apprise URL to read: '
'{}'.format(self.url(privacy=True)))
Expand All @@ -681,26 +709,23 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
'sendMessage'
)

payload = {
_payload = {
# Notification Audible Control
'disable_notification': self.silent,
# Display Web Page Preview (if possible)
'disable_web_page_preview': not self.preview,
}

if self.topic:
payload['message_thread_id'] = self.topic

# Prepare Message Body
if self.notify_format == NotifyFormat.MARKDOWN:
payload['parse_mode'] = 'MARKDOWN'
_payload['parse_mode'] = 'MARKDOWN'

payload['text'] = body
_payload['text'] = body

else: # HTML

# Use Telegram's HTML mode
payload['parse_mode'] = 'HTML'
_payload['parse_mode'] = 'HTML'
for r, v, m in self.__telegram_escape_html_entries:

if 'html' in m:
Expand All @@ -712,7 +737,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
body = r.sub(v, body)

# Prepare our payload based on HTML or TEXT
payload['text'] = body
_payload['text'] = body

# Handle payloads without a body specified (but an attachment present)
attach_content = \
Expand All @@ -721,41 +746,31 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
# Create a copy of the chat_ids list
targets = list(self.targets)
while len(targets):
chat_id = targets.pop(0)
chat_id = IS_CHAT_ID_RE.match(chat_id)
if not chat_id:
self.logger.warning(
"The specified chat_id '%s' is invalid; skipping." % (
chat_id,
)
)
target = targets.pop(0)
chat_id, topic = target

# Flag our error
has_error = True
continue

if chat_id.group('name') is not None:
# Name
payload['chat_id'] = '@%s' % chat_id.group('name')
# Printable chat_id details
pchat_id = f'{chat_id}' if not topic else f'{chat_id}:{topic}'

else:
# ID
payload['chat_id'] = int(chat_id.group('idno'))
payload = _payload.copy()
payload['chat_id'] = chat_id
if topic:
payload['message_thread_id'] = topic

if self.include_image is True:
# Define our path
if not self.send_media(payload['chat_id'], notify_type):
if not self.send_media(target, notify_type):
# We failed to send the image associated with our
notify_type
self.logger.warning(
'Failed to send Telegram type image to {}.',
payload['chat_id'])
pchat_id)

if attach and self.attachment_support and \
attach_content == TelegramContentPlacement.AFTER:
# Send our attachments now (if specified and if it exists)
if not self._send_attachments(
chat_id=payload['chat_id'], notify_type=notify_type,
target, notify_type=notify_type,
attach=attach):

has_error = True
Expand Down Expand Up @@ -803,7 +818,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
self.logger.warning(
'Failed to send Telegram notification to {}: '
'{}, error={}.'.format(
payload['chat_id'],
pchat_id,
error_msg if error_msg else status_str,
r.status_code))

Expand All @@ -817,7 +832,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
except requests.RequestException as e:
self.logger.warning(
'A connection error occurred sending Telegram:%s ' % (
payload['chat_id']) + 'notification.'
pchat_id) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))

Expand All @@ -833,7 +848,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
# it was identified to send the content before the attachments
# which is now done.
if not self._send_attachments(
chat_id=payload['chat_id'],
target=target,
notify_type=notify_type,
attach=attach):

Expand All @@ -842,14 +857,14 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,

return not has_error

def _send_attachments(self, chat_id, notify_type, attach):
def _send_attachments(self, target, notify_type, attach):
"""
Sends our attachments
"""
has_error = False
# Send our attachments now (if specified and if it exists)
for attachment in attach:
if not self.send_media(chat_id, notify_type, attach=attachment):
if not self.send_media(target, notify_type, attach=attachment):

# We failed; don't continue
has_error = True
Expand Down Expand Up @@ -880,13 +895,21 @@ def url(self, privacy=False, *args, **kwargs):
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))

targets = []
for (chat_id, _topic) in self.targets:
topic = _topic if _topic else self.topic

targets.append(''.join(
[NotifyTelegram.quote(f'{chat_id}', safe='@')
if isinstance(chat_id, str) else f'{chat_id}',
'' if not topic else f':{topic}']))

# No need to check the user token because the user automatically gets
# appended into the list of chat ids
return '{schema}://{bot_token}/{targets}/?{params}'.format(
schema=self.secure_protocol,
bot_token=self.pprint(self.bot_token, privacy, safe=''),
targets='/'.join(
[NotifyTelegram.quote('@{}'.format(x)) for x in self.targets]),
targets='/'.join(targets),
params=NotifyTelegram.urlencode(params))

def __len__(self):
Expand Down Expand Up @@ -987,6 +1010,7 @@ def parse_url(url):

# Include images with our message
results['detect_owner'] = \
parse_bool(results['qsd'].get('detect', True))
parse_bool(
results['qsd'].get('detect', not results['targets']))

return results
8 changes: 4 additions & 4 deletions test/test_plugin_telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def test_plugin_telegram_general(mock_post):
invalid_bot_token = 'abcd:123'

# Chat ID
chat_ids = 'l2g, lead2gold'
chat_ids = 'l2g:1234, lead2gold'

# Prepare Mock
mock_post.return_value = requests.Request()
Expand Down Expand Up @@ -397,7 +397,7 @@ def test_plugin_telegram_general(mock_post):

obj = NotifyTelegram(bot_token=bot_token, targets='12345')
assert len(obj.targets) == 1
assert obj.targets[0] == '12345'
assert obj.targets[0] == (12345, None)

# Test the escaping of characters since Telegram escapes stuff for us to
# which we need to consider
Expand Down Expand Up @@ -440,7 +440,7 @@ def test_plugin_telegram_general(mock_post):

assert obj.notify(title='hello', body='world') is True
assert len(obj.targets) == 1
assert obj.targets[0] == '532389719'
assert obj.targets[0] == ('532389719', None)

# Do the test again, but without the expected (parsed response)
mock_post.return_value.content = dumps({
Expand Down Expand Up @@ -550,7 +550,7 @@ def test_plugin_telegram_general(mock_post):
'tgram://123456789:ABCdefghijkl123456789opqyz/-123456789525')
assert isinstance(obj, NotifyTelegram)
assert len(obj.targets) == 1
assert '-123456789525' in obj.targets
assert (-123456789525, None) in obj.targets


@mock.patch('requests.post')
Expand Down

0 comments on commit d6e0d2e

Please sign in to comment.