-
Notifications
You must be signed in to change notification settings - Fork 1
/
ufirebase.py
134 lines (102 loc) · 3.41 KB
/
ufirebase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# adapted from firebase/EventSource-Examples/python/chat.py by Shariq Hashme
from usseclient import SSEClient
import urequests as requests
import ujson as json
###import _thread as thread
import usocket as socket
class ClosableSSEClient(SSEClient):
def __init__(self, *args, **kwargs):
self.should_connect = True
super(ClosableSSEClient, self).__init__(*args, **kwargs)
def _connect(self):
if self.should_connect:
super(ClosableSSEClient, self)._connect()
else:
raise StopIteration()
def close(self):
self.should_connect = False
self.retry = 0
try:
self.resp.raw._fp.fp._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp._sock.close()
except AttributeError:
pass
class RemoteThread():
def __init__(self, parent, URL, function):
self.function = function
self.URL = URL
self.parent = parent
super(RemoteThread, self).__init__()
def run(self):
try:
self.sse = ClosableSSEClient(self.URL)
for msg in self.sse:
msg_data = json.loads(msg.data)
if msg_data is None: # keep-alives
continue
msg_event = msg.event
# TODO: update parent cache here
self.function((msg.event, msg_data))
except socket.error:
pass # this can happen when we close the stream
except KeyboardInterrupt:
self.close()
def start(self, run):
thread.start_new_thread(run)
def stop(self):
thread.exit()
def close(self):
if self.sse:
self.sse.close()
def firebaseURL(URL):
if '.firebaseio.com' not in URL.lower():
if '.json' == URL[-5:]:
URL = URL[:-5]
if '/' in URL:
if '/' == URL[-1]:
URL = URL[:-1]
URL = 'https://' + \
URL.split('/')[0] + '.firebaseio.com/' + URL.split('/', 1)[1] + '.json'
else:
URL = 'https://' + URL + '.firebaseio.com/.json'
return URL
if 'http://' in URL:
URL = URL.replace('http://', 'https://')
if 'https://' not in URL:
URL = 'https://' + URL
if '.json' not in URL.lower():
if '/' != URL[-1]:
URL = URL + '/.json'
else:
URL = URL + '.json'
return URL
class subscriber:
def __init__(self, URL, function):
self.cache = {}
self.remote_thread = RemoteThread(self, firebaseURL(URL), function)
def start(self):
self.remote_thread.start()
def stop(self):
self.remote_thread.stop()
class FirebaseException(Exception):
pass
def put(URL, msg):
to_post = json.dumps(msg)
response = requests.put(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def patch(URL, msg):
to_post = json.dumps(msg)
response = requests.patch(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def get(URL):
response = requests.get(firebaseURL(URL))
if response.status_code != 200:
raise FirebaseException(response.text)
return json.loads(response.text)
def push(URL, msg):
to_post = json.dumps(msg)
response = requests.post(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise Exception(response.text)