-
Notifications
You must be signed in to change notification settings - Fork 2
/
common.py
95 lines (73 loc) · 2.58 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from sentence_transformers import SentenceTransformer
import psycopg2
from pgvector.psycopg2 import register_vector
import time
model = SentenceTransformer(
"sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
)
conn_params = {
"dbname": "store",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": "5430",
}
postgres = psycopg2.connect(**conn_params)
# Function to fetch Bible text from the SQLite database and split it into sentences
def read_verses(handler, max_items=24000, minibatch_size=100, **kwargs):
# Connect to the PostgreSQL database
cur = postgres.cursor()
cur.execute("SET search_path TO store, public")
register_vector(cur)
# Initialize variables
batch_size = 1000
offset = 0
sentences = []
elapsed_time = 0
calls = 0
# check for pipeline (redis)
pipeline = kwargs["pipeline"] if "pipeline" in kwargs else None
while offset < max_items:
# Query to select text from Chapter with LIMIT and OFFSET
# To have smaller dataset, add to WHERE part:
# AND translationId = 'rus_syn'
query = f"""
SELECT text, translationId, bookId, chapterNumber, Number, embedding
FROM store."ChapterVerse"
WHERE embedding IS NOT NULL
LIMIT {batch_size} OFFSET {offset}
"""
# ORDER BY chapterNumber, number
start_time = time.perf_counter()
# Execute the query and fetch results
cur.execute(query)
rows = cur.fetchall()
# If no more rows are returned, break the loop
if not rows:
break
preprocessedRows = []
for row in rows:
id = f"{row[1]}_{row[2]}_{row[3]}_{row[4]}"
text = row[0]
meta = {
"translationId": row[1],
"bookId": row[2],
"chapterNumber": row[3],
"verseNumber": row[4],
}
embedding = row[5]
preprocessedRows.append((id, text, meta, embedding))
print(f"DB Query time: {time.perf_counter() - start_time} sec")
for i in range(0, len(preprocessedRows), minibatch_size):
chunk = preprocessedRows[i : i + minibatch_size]
calls += 1
if pipeline:
elapsed_time += handler(chunk, pipeline)
else:
elapsed_time += handler(chunk)
# Increment the offset for the next batch
offset += batch_size
print(f"avg insert time: {elapsed_time/calls} sec")
# Close the database connection
cur.close()
return sentences