-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
422 lines (360 loc) · 19.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# nomadForum - a forum on the NomadNetwork
# Copyright (C) 2023-2024 AutumnSpark1226
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import os.path
import sys
import sqlite3
import string
import re
import secrets
from sqlite3 import Connection, Cursor
from uuid import UUID
from cryptography.fernet import Fernet, MultiFernet
version = "git" # forum version (release); git is the default for unstable/testing (directly from github/codeberg/other git server)
# This constant should be "git" for all non-release copies of this software. For releases, this constant should be the matching release version.
db_version = 3 # database version (used for database migrations); do NOT change this unless you know what you're doing!
# This version starts with 0 and should be increased every time a database migration must be performed. The migration should be implemented in admin/migrate.py. A mismatch between this constant and the value in the database will disable all database operations and the entire forum!
# configure your forum here
storage_path = ".nomadForum" # folder containing all saved files (database, keys, etc.); it is highly recommended to use an absolute path; WITHOUT / at the end
page_path = "/page/nomadForum" # path on your node, here {node_address}:/page/nomadforum/index.mu (nomadnet url) or ~/.nomadnetwork/storage/pages/nomadforum/index.mu (file path) would be the main page; WITHOUT / at the end
forum_name = "nomadForum" # name your forum
# add your own front page information here
main_page_info = """
`!A forum on the NomadNetwork`!
...
"""
notifications_enabled = True # enable or disable notifications
enable_update_checks = False # disabled by default; This will connect to https://aspark.uber.space and request the newest version. Enable if you want to see update reminders on the admin page.
connection: Connection
cursor: Cursor
def setup_db(open_for_migration=False) -> None:
global connection, cursor
if not os.path.isdir(storage_path):
os.mkdir(storage_path)
connection = sqlite3.connect(storage_path + "/database.db")
cursor = connection.cursor()
# settings: setting_id, key, value
execute_sql("CREATE TABLE IF NOT EXISTS settings (setting_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT)")
# users: user_id, username, enabled, password((hashed, encrypted)), link_id, login_time, display_name, about, show_online, default_styling
execute_sql("CREATE TABLE IF NOT EXISTS users (user_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, enabled INTEGER DEFAULT 1 NOT NULL, password TEXT NOT NULL, link_id TEXT, login_time INTEGER, display_name TEXT NOT NULL, about TEXT NOT NULL, show_online INTEGER DEFAULT 0 NOT NULL, default_styling TEXT NOT NULL)")
# posts: numeric_id, post_id, username, title, content, locked, edited, created, changed, last_action
execute_sql("CREATE TABLE IF NOT EXISTS posts (numeric_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, post_id TEXT NOT NULL UNIQUE, username TEXT NOT NULL, title TEXT NOT NULL, content TEXT NOT NULL, locked INTEGER DEFAULT 0 NOT NULL, edited INTEGER DEFAULT 0 NOT NULL, created INTEGER NOT NULL, changed INTEGER NOT NULL, last_action INTEGER NOT NULL)")
# comments: numeric_id, comment_id, post_id, parent, username, content, edited, created, changed
execute_sql("CREATE TABLE IF NOT EXISTS comments (numeric_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, comment_id TEXT NOT NULL UNIQUE, post_id TEXT NOT NULL, parent TEXT NOT NULL, username TEXT NOT NULL, content TEXT NOT NULL, edited INTEGER DEFAULT 0 NOT NULL, created INTEGER NOT NULL, changed INTEGER NOT NULL)")
# connections: conn_id, username, remote_id((encrypted)), allow_login, send_notifications, public, verified
execute_sql("CREATE TABLE IF NOT EXISTS connections (conn_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, remote_id TEXT NOT NULL, allow_login INTEGER DEFAULT 0 NOT NULL, send_notifications INTEGER DEFAULT 0 NOT NULL, public INTEGER DEFAULT 0 NOT NULL, verified INTEGER DEFAULT 0 NOT NULL)")
# subscriptions: sub_id, username, post_id
execute_sql("CREATE TABLE IF NOT EXISTS subscriptions (sub_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, post_id TEXT NOT NULL)")
# verification_codes: code_id, username, code((encrypted)), use_type, use_id, creation_time, try_counter
execute_sql("CREATE TABLE IF NOT EXISTS verification_codes (code_id INTEGER NOT NULL UNIQUE PRIMARY KEY AUTOINCREMENT, username TEXT, code TEXT NOT NULL, use_type TEXT NOT NULL, use_id TEXT NOT NULL, creation_time INTEGER NOT NULL, try_counter INTEGER DEFAULT 0 NOT NULL)")
# insert version if it's not stored
if len(query_database("SELECT value FROM settings WHERE key = 'db_version'")) == 0:
execute_sql(f"INSERT INTO settings (key, value) VALUES ('db_version', '{db_version}')")
if query_database('SELECT value FROM settings WHERE key = \'db_version\'')[0][0] != str(db_version) and not open_for_migration:
print("CRITICAL: Database migration required! Contact an admin!")
cursor.close()
connection.close()
exit(0)
purge()
def purge():
# remove old sessions (after one hour)
execute_sql("UPDATE users SET link_id = '0', login_time = 0 WHERE link_id != '0' AND (login_time + 3600) < unixepoch()")
# remove old or failed verification codes and the corresponding unverified connections
expired_codes = query_database("SELECT code_id, use_id, use_type FROM verification_codes WHERE (creation_time + 1800) < unixepoch() OR try_counter > 2")
for code in expired_codes:
if code[2] == "add_lxmf":
query_result = query_database("SELECT remote_id, conn_id FROM connections WHERE verified = 0")
for data in query_result:
if decrypt(data[0]) == decrypt(code[1]):
execute_sql(f"DELETE FROM connections WHERE conn_id = '{data[1]}' AND verified = 0")
execute_sql(f"DELETE FROM verification_codes WHERE code_id = {code[0]}")
def close_database(write_changes=True) -> None:
# changes are only written when the database closes
global connection, cursor
if write_changes:
connection.commit()
cursor.close()
connection.close()
def execute_sql(command: str) -> None:
global cursor
cursor.execute(command)
def query_database(command: str) -> [[]]:
global cursor
result = cursor.execute(command)
return result.fetchall()
# return true if the username is allowed
def check_username(username: str, allow_admin=False) -> bool:
# check if string is printable
if not set(username).issubset(set(string.printable)):
return False
# don't allow SQL injections and some other characters
if "'" in username or "\\" in username or '"' in username or "`" in username or "\n" in username or "[" in username or "]" in username:
return False
# don't allow double space
if " " in username:
return False
# don't allow "admin" (reserved, can be added by executing the admin/create_create_admin_account.py script)
if username.upper() == "ADMIN" and not allow_admin:
return False
# don't allow "system" (reserved, used by system actions in the future)
if username.upper() == "SYSTEM":
return False
# don't allow "[DELETED]" (reserved for posts made by deleted users); block some other variations to avoid confusion
if username.upper() == "[DELETED]" or username.upper() == "DELETED" or username.upper() == "(DELETED)" or username.upper() == "{DELETED}":
return False
return True
# return true if the display name is allowed
def check_display_name(display_name: str) -> bool:
# check if string is printable
if not set(display_name).issubset(set(string.printable)):
return False
# don't allow some other characters
if "\n" in display_name or "[" in display_name or "]" in display_name:
return False
# don't allow "system" (reserved, used by system actions in the future)
if display_name.upper() == "SYSTEM":
return False
# don't allow "[DELETED]" (reserved for posts made by deleted users); block some other variations to avoid confusion
if display_name.upper() == "[DELETED]" or display_name.upper() == "DELETED" or display_name.upper() == "(DELETED)" or display_name.upper() == "{DELETED}":
return False
return True
# returns true if the paramater is a valid uuid (version 4)
def check_uuid(possible_uuid: str) -> bool:
try:
UUID(possible_uuid, version=4)
return True
except ValueError:
return False
def prepare_content(content: str) -> str:
# replace \
content = content.replace("\\", "\\\\")
# don't allow SQL injections
content = content.replace("\'", "\'\'")
# replace unwanted micron formatting
content = content.replace("#", "\\#") # sorry, no hidden messages
content = content.replace("`=", "\\`=") # don't allow literals
return content
def prepare_display_name(display_name: str, username: str) -> str:
if not check_display_name(display_name):
return username
# replace \
display_name = display_name.replace("\\", "\\\\")
# don't allow SQL injections
display_name = display_name.replace("\'", "\'\'")
# replace unwanted micron formatting
display_name = display_name.replace("`=", "") # don't allow literals
display_name = display_name.replace("`c", "")
display_name = display_name.replace("`a", "")
display_name = display_name.replace("`r", "")
display_name = display_name.replace("`<", "")
return display_name
def prepare_title(title: str) -> str:
# remove newline
title = title.split("\n")[0]
# replace \
title = title.replace("\\", "\\\\")
# don't allow SQL injections
title = title.replace("\'", "\'\'")
# replace unwanted micron formatting
title = title.replace("`", "\\`")
return title
def remove_micron(text: str):
text = text.replace("``", "")
text = re.sub(r'`F.{3}', '', text)
text = re.sub(r'`B.{3}', '', text)
text = re.sub(r'`\[(.*?)`(.+)\]', r'\1 (link to \2)', text) # links can't be clicked in nomadnet, so they should at least look ok
text = text.replace("`c", "")
text = text.replace("`a", "")
text = text.replace("`r", "")
text = text.replace("`!", "")
text = text.replace("`_", "")
text = text.replace("`*", "")
text = text.replace("`b", "")
text = text.replace("`f", "")
text = text.replace("`", "'")
return text
def print_header(link_id: str, reload=False) -> None:
if reload:
print("#!c=0")
print('`F000`Bddd')
print('-')
if reload:
reload_option = "`reload=5636"
else:
reload_option = ""
# customize the header here
if len(query_database(f"SELECT user_id FROM users WHERE link_id = '{link_id}'")) != 0:
# this is used when a user is logged in
admin_link = ""
username = query_database(f"SELECT username FROM users WHERE link_id = '{link_id}'")[0][0]
if len(query_database(f"SELECT value FROM settings WHERE key = 'admin_username' AND value = '{username}'")) == 1:
admin_link = f" | `Ff22`_`[Admin page`:{page_path}/admin/admin.mu]`_`F000"
print(f"`c| `!{forum_name}`! | `Ff22`_`[Home`:{page_path}/index.mu{reload_option}]`_`F000{admin_link} | `Ff22`_`[Create Post`:{page_path}/post.mu]`_`F000 | `Ff22`_`[Settings`:{page_path}/user_settings.mu]`_`F000 | `Ff22`_`[Logout`:{page_path}/logout.mu`confirm=yes|source_link_id={link_id}]`_`F000 |")
else:
# this is used when a user is not logged in
print(f"`c| `!{forum_name}`! | `Ff22`_`[Home`:{page_path}/index.mu{reload_option}]`_`F000 | `Ff22`_`[Login`:{page_path}/login.mu]`_`F000 | `Ff22`_`[Register`:{page_path}/register.mu]`_`F000 |")
print("`F000")
print("-")
print("`a`b`f")
print()
def handle_ids() -> [str, str]:
link_id, remote_identity = "", ""
for env_variable in os.environ:
if env_variable == "link_id":
link_id = os.environ[env_variable]
if env_variable == "remote_identity":
remote_identity = os.environ[env_variable]
if len(link_id) != 32 or not set(link_id).issubset(set(string.hexdigits)):
print("something went wrong...")
sys.exit(0)
setup_db()
if len(query_database(f"SELECT user_id FROM users WHERE link_id = '{link_id}' AND enabled = 1")) != 1 and len(remote_identity) != 0:
if len(remote_identity) != 32 or not set(remote_identity).issubset(set(string.hexdigits)):
print("something went wrong...")
close_database(write_changes=False)
sys.exit(0)
else:
check_remote_identity(link_id, remote_identity)
return link_id, remote_identity
def check_remote_identity(link_id: str, remote_identity: str) -> None:
query_result = query_database("SELECT username, remote_id FROM connections WHERE allow_login = 1 AND verified = 1")
for data in query_result:
if decrypt(data[1]) == remote_identity:
execute_sql(f"UPDATE users SET link_id = '{link_id}', login_time = unixepoch() WHERE username = '{data[0]}' AND enabled = 1")
break
# This function checks if source_link_id is valid (the same as the current one). This prevents many cases of oneclick exploits (see test/link_injection_test.mu).
# Add 'source_link_id={link_id}' to the link that should be checked and call this function somewhere. It returns on success and exit on failure.
def verify_link_id() -> bool:
current_link_id = ""
source_link_id = ""
for env_variable in os.environ:
if env_variable == "link_id":
current_link_id = os.environ[env_variable]
if env_variable == "var_source_link_id":
source_link_id = os.environ[env_variable]
if len(current_link_id) != 32 or not set(current_link_id).issubset(set(string.hexdigits)):
print("Verification error!")
close_database(write_changes=False)
sys.exit(0)
if len(source_link_id) != 32 or not set(source_link_id).issubset(set(string.hexdigits)):
print("Verification error!")
close_database(write_changes=False)
sys.exit(0)
if current_link_id == source_link_id:
return True
else:
print("Verification error!")
close_database(write_changes=False)
sys.exit(0)
def get_MultiFernet() -> MultiFernet:
key_path = storage_path + "/key.secret"
if os.path.isfile(key_path):
keyfile = open(key_path, 'r')
lines = keyfile.readlines()
keyfile.close()
fernets = []
for line in lines:
if not line.strip()[0] == "#":
fernets.append(Fernet(line.strip().encode()))
return MultiFernet(fernets)
else:
key = Fernet.generate_key()
keyfile = open(key_path, "w")
# add warnings to keyfile
keyfile.write("# !!! DO NOT MODIFY THIS FILE (unless you know what you're doing). It contains keys required for the encryption of user passwords. Moficatitions can cause users not being able to log in. !!!\n")
keyfile.write("# The first key (in the first line) will be used for encryption. All lines starting with '#' will be ignored.\n")
keyfile.write(key.decode())
keyfile.close()
try:
os.system(f"chmod 400 {key_path}") # set permissions
except Exception as e:
print("Could not set file permissions. You can ignore this error if you are on windows.")
fernets = []
fernets.append(Fernet(key))
return MultiFernet(fernets)
def encrypt(data: str) -> str:
mf = get_MultiFernet()
return mf.encrypt(data.encode()).decode()
def decrypt(encrypted_data: str) -> str:
mf = get_MultiFernet()
return mf.decrypt(encrypted_data.encode()).decode()
def add_new_key() -> None:
key_path = storage_path + "/key.secret"
if os.path.isfile(key_path):
keyfile = open(key_path, 'r')
lines = keyfile.readlines()
keyfile.close()
keys = []
for line in lines:
keys.append(line.strip().encode())
keys.insert(0, Fernet.generate_key())
os.remove(key_path)
keyfile = open(key_path, "w")
for key in keys:
keyfile.write(key.decode() + "\n")
keyfile.close()
try:
os.system(f"chmod 400 {key_path}") # set permissions
except Exception as e:
print("Could not set file permissions. You can ignore this error if you are on windows.")
def rotate_keys() -> None:
add_new_key()
mf = get_MultiFernet()
user_data_all = query_database("SELECT username, password FROM users")
for user_data in user_data_all:
execute_sql(f"UPDATE users SET password = '{mf.rotate(user_data[1].encode()).decode()}' WHERE username = '{user_data[0]}' AND password = '{user_data[1]}'")
connection_data_all = query_database("SELECT conn_id, remote_id FROM connections")
for connection_data in connection_data_all:
execute_sql(f"UPDATE connections SET remote_id = '{mf.rotate(connection_data[1].encode()).decode()}' WHERE conn_id = {connection_data[0]} AND remote_id = '{connection_data[1]}'")
verification_codes = query_database("SELECT code_id, code FROM verification_codes")
for verification_code in verification_codes:
execute_sql(f"UPDATE verification_codes SET remote_id = '{mf.rotate(verification_code[1].encode()).decode()}' WHERE conn_id = {verification_code[0]} AND remote_id = '{verification_code[1]}'")
# will return true if the logged in user is an admin
def check_admin(link_id: str) -> bool:
query_result = query_database(f"SELECT username FROM users WHERE link_id = '{link_id}'")
if len(query_result) == 1:
username = query_result[0][0]
if len(query_database(f"SELECT value FROM settings WHERE key = 'admin_username' AND value = '{username}'")) == 1:
return True
return False
def get_verification_code():
verification_code = ''.join(secrets.choice(string.digits) for _ in range(6))
verification_code = verification_code[:3] + '-' + verification_code[3:]
return verification_code
def generate_verification(username: str, use_type: str, use_id: str, recipient: str):
# recipient must be an identity and not a lxmf address
if not notifications_enabled:
raise Exception("notifications are disabled; no verification will be sent")
code = get_verification_code()
import notify
message = "This is an error"
if use_type == "add_lxmf":
message = f"Your verification code is:\n{code}\nIf you did not request verification, you can ignore this message."
try:
notify.add_notification([recipient, message])
except:
print("Could not send notification. Contact an admin!")
main.close_database(write_changes=False)
exit(0)
execute_sql(f"INSERT INTO verification_codes (username, code, use_type, use_id, creation_time) VALUES ('{username}', '{encrypt(code)}', '{use_type}', '{use_id}', unixepoch())")
def delete_comment_chain(comment_ids):
for comment_id in comment_ids:
execute_sql(f"DELETE FROM comments WHERE comment_id = '{comment_id[0]}'")
comment_chain = query_database(f"SELECT comment_id FROM comments WHERE parent = '{comment_id[0]}'")
delete_comment_chain(comment_chain)
# setting this variable is used for testing
if "NOMADFORUM_STORAGE_PATH" in os.environ:
storage_path = os.environ["NOMADFORUM_STORAGE_PATH"]