-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
87 lines (62 loc) · 2.48 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python3
import logging
import re
import os
from influxdb import InfluxDBClient
from telegram import Bot, Update, Chat, User, Message
from telegram.ext import Updater, MessageHandler
from telegram.ext.filters import Filters
from telegram.utils.helpers import to_timestamp
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
TOKEN = os.getenv('TOKEN')
PROXY_CHAN = -1001151984662
CWSEX_CHAN = -1001351906858
HOST = 'localhost'
PORT = 8086
USER = 'root'
PASSWORD = 'root'
DBNAME = 'cwsex'
item_re = re.compile(r"(^.[\w\-']+(?: \w+)*).+(?:\/t_([\d\w]{2,3}))$\n(?:\s{2}(\d{1,4})💰 x (\d{1,4})pcs = \d{1,4}💰\s)+", re.MULTILINE)
trade_re = re.compile(r"^(?:\s{2}(\d{1,4})💰 x (\d{1,4})pcs = \d{1,4}💰\s)", re.MULTILINE)
client = InfluxDBClient(HOST, PORT, USER, PASSWORD, DBNAME)
def error_cb(bot: Bot, update: Update, error: Exception):
logger.exception(error)
def process_trade(bot: Bot, update: Update):
chat = update.effective_chat # type: Chat
user = update.effective_user # type: User
msg = update.effective_message # type: Message
if msg.forward_from_chat.id != CWSEX_CHAN:
return
logger.info('Processing trades for {:%d %B %Y %H:%M:%S}'.format(msg.forward_date))
logger.debug('Received message: %s', msg.text)
timestamp = to_timestamp(msg.forward_date)
points = []
for trades in re.finditer(item_re, msg.text):
offset = 0
logger.debug(trades.group(1,0))
item, trade_text = trades.group(1, 0)
measurement = item.replace(' ', r'\ ')
step = 60//sum(1 for _ in trade_re.finditer(trade_text))
for trade in re.finditer(trade_re, trade_text):
logger.debug(trade.group(1,2))
price, vol = trade.group(1,2)
data = "{} price={},volume={} {}".format(measurement, price, vol, timestamp+offset)
logger.info("writing to influxdb <%s>", data)
offset += step
points.append(data)
ret = client.write_points(points, time_precision='s', protocol='line')
if ret:
logger.info("db written successfully")
else:
logger.error("failed writing points to db")
def main():
ud = Updater(TOKEN)
dp = ud.dispatcher
dp.add_handler(MessageHandler(Filters.chat(chat_id=PROXY_CHAN), process_trade))
dp.add_error_handler(error_cb)
ud.start_polling()
ud.idle()
if __name__ == '__main__':
main()