Skip to content

Commit

Permalink
fix: WAL replay warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
tazarov committed Feb 23, 2024
1 parent d9a8c28 commit fa94bdb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
5 changes: 4 additions & 1 deletion chromadb/db/mixins/embeddings_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ def _backfill(self, subscription: Subscription) -> None:
.where(t.topic == ParameterValue(subscription.topic_name))
.where(t.seq_id > ParameterValue(subscription.start))
.where(t.seq_id <= ParameterValue(subscription.end))
.select(t.seq_id, t.operation, t.id, t.vector, t.encoding, t.metadata)
.select(
t.seq_id, t.operation, t.id, t.vector, t.encoding, t.metadata, t.topic
)
.orderby(t.seq_id)
)
with self.tx() as cur:
Expand All @@ -312,6 +314,7 @@ def _backfill(self, subscription: Subscription) -> None:
embedding=vector,
encoding=encoding,
metadata=json.loads(row[5]) if row[5] else None,
wal_replay=True,
)
],
)
Expand Down
8 changes: 6 additions & 2 deletions chromadb/segment/impl/metadata/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ def _insert_record(
if upsert:
return self._update_record(cur, record)
else:
if "wal_replay" in record.keys() and record["wal_replay"]:
return
logger.warning(f"Insert of existing embedding ID: {record['id']}")
# We are trying to add for a record that already exists. Fail the call.
# We don't throw an exception since this is in principal an async path
Expand Down Expand Up @@ -415,7 +417,8 @@ def _delete_record(self, cur: Cursor, record: EmbeddingRecord) -> None:
sql = sql + " RETURNING id"
result = cur.execute(sql, params).fetchone()
if result is None:
logger.warning(f"Delete of nonexisting embedding ID: {record['id']}")
if "wal_replay" not in record.keys() or not record["wal_replay"]:
logger.warning(f"Delete of nonexisting embedding ID: {record['id']}")
else:
id = result[0]

Expand Down Expand Up @@ -446,7 +449,8 @@ def _update_record(self, cur: Cursor, record: EmbeddingRecord) -> None:
sql = sql + " RETURNING id"
result = cur.execute(sql, params).fetchone()
if result is None:
logger.warning(f"Update of nonexisting embedding ID: {record['id']}")
if "wal_replay" not in record.keys() or not record["wal_replay"]:
logger.warning(f"Update of nonexisting embedding ID: {record['id']}")
else:
id = result[0]
if record["metadata"]:
Expand Down
16 changes: 11 additions & 5 deletions chromadb/segment/impl/vector/local_persistent_hnsw.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ def _write_records(self, records: Sequence[EmbeddingRecord]) -> None:
self._max_seq_id = max(self._max_seq_id, record["seq_id"])
id = record["id"]
op = record["operation"]
wal_replay = (
record["wal_replay"] if "wal_replay" in record.keys() else False
)
exists_in_index = self._id_to_label.get(
id, None
) is not None or self._brute_force_index.has_id(id)
Expand All @@ -251,24 +254,27 @@ def _write_records(self, records: Sequence[EmbeddingRecord]) -> None:
if exists_in_bf_index:
self._brute_force_index.delete([record])
else:
logger.warning(f"Delete of nonexisting embedding ID: {id}")
if not wal_replay:
logger.warning(f"Delete of nonexisting embedding ID: {id}")

elif op == Operation.UPDATE:
if record["embedding"] is not None:
if exists_in_index:
self._curr_batch.apply(record)
self._brute_force_index.upsert([record])
else:
logger.warning(
f"Update of nonexisting embedding ID: {record['id']}"
)
if not wal_replay:
logger.warning(
f"Update of nonexisting embedding ID: {record['id']}"
)
elif op == Operation.ADD:
if record["embedding"] is not None:
if not exists_in_index:
self._curr_batch.apply(record, not exists_in_index)
self._brute_force_index.upsert([record])
else:
logger.warning(f"Add of existing embedding ID: {id}")
if not wal_replay:
logger.warning(f"Add of existing embedding ID: {id}")
elif op == Operation.UPSERT:
if record["embedding"] is not None:
self._curr_batch.apply(record, exists_in_index)
Expand Down
1 change: 1 addition & 0 deletions chromadb/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class EmbeddingRecord(TypedDict):
# single node, since data written with older versions of the code won't be able to
# populate it.
collection_id: Optional[UUID]
wal_replay: Optional[bool]


class SubmitEmbeddingRecord(TypedDict):
Expand Down

0 comments on commit fa94bdb

Please sign in to comment.