-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
137 lines (118 loc) · 4.48 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import threading
class Database:
def __init__(self):
self.db_lock = threading.Lock()
self.ports_to_email_map = {} # {port:email}
self.db = {}
self.last_active_port = None
# {
# email: {
# "port": port,
# "pid": pid,
# "timestamp": timestamp,
# "websocket": websocket,
# "tracepoint_map": {
# line_no: tracePointId,
# }
# }
# }
def get_last_active_port(self):
with self.db_lock:
return self.last_active_port
def increment_last_active_port(self, start, end):
with self.db_lock:
if not self.last_active_port:
self.last_active_port = start
return
if self.last_active_port < end-1:
self.last_active_port += 1
return
else:
self.last_active_port = start
return
def get_data_for_email(self, email):
with self.db_lock:
return self.db.get(email, None)
def get_all_emails(self):
with self.db_lock:
emails = []
for email, _ in list(self.db.items()):
emails.append(email)
return emails
def delete_email(self, email):
with self.db_lock:
if email not in self.db:
return
port = self.db[email].get("port",None)
del self.db[email]
if port and int(port) in self.ports_to_email_map:
del self.ports_to_email_map[int(port)]
def set_websocket_for_email(self, email, websocket):
with self.db_lock:
if email not in self.db:
return False
self.db[email]["websocket"] = websocket
return True
def check_email_in_db(self, email):
with self.db_lock:
return email in self.db
def get_websocket_for_email(self, email):
status = self.check_email_in_db(email)
if not status:
return
with self.db_lock:
return self.db[email]["websocket"]
def initialize_tracepointmap_if_not_exists(self, email):
status = self.check_email_in_db(email)
if not status:
return
with self.db_lock:
if "tracepoint_map" not in self.db[email]:
self.db[email]["tracepoint_map"] = {}
def update_tracepoint_map(self, email, line_no, tracePointId):
status = self.check_email_in_db(email)
if not status:
return
self.initialize_tracepointmap_if_not_exists(email)
with self.db_lock:
self.db[email]["tracepoint_map"][line_no] = tracePointId
def get_tracePointId_for_email_lineno(self, email, line_no):
if not self.check_email_in_db(email):
return
with self.db_lock:
tracepointid_map = self.db[email].get("tracepoint_map",{})
return tracepointid_map.get(line_no, None)
def delete_lineno_from_tracepointid_map_for_email(self, email, line_no):
if not self.check_email_in_db(email):
return
with self.db_lock:
if "tracepoint_map" in self.db[email] and line_no in self.db[email]["tracepoint_map"]:
del self.db[email]["tracepoint_map"][line_no]
def set_port_for_email(self, email, port):
if not self.check_email_in_db(email):
with self.db_lock:
self.db[email] = {}
with self.db_lock:
self.db[email]["port"] = int(port)
self.ports_to_email_map[int(port)] = email
def set_pid_for_email(self, email, pid):
if not self.check_email_in_db(email):
return
with self.db_lock:
self.db[email]["pid"] = pid
def set_timestamp_for_email(self, email, timestamp):
if not self.check_email_in_db(email):
return
with self.db_lock:
self.db[email]["timestamp"] = timestamp
def get_port_for_email(self, email):
if not self.check_email_in_db(email):
return
with self.db_lock:
return self.db[email].get("port",None)
def check_port_in_use(self, port):
with self.db_lock:
return int(port) in self.ports_to_email_map
def get_email_for_port(self, port):
with self.db_lock:
return self.ports_to_email_map.get(int(port), None)