Skip to content

Commit

Permalink
feat: wait for meilisearch index creation to succeed
Browse files Browse the repository at this point in the history
In `search.meilisearch.create_indexes`, we were not waiting for the
index creation tasks to complete. This was causing a potential race
condition, where the `create_indexes` function would fail because it
took a few seconds for the index creation to succeed.

See the relevant conversation here:
openedx/edx-platform#35743 (comment)
  • Loading branch information
regisb committed Oct 31, 2024
1 parent 91686e9 commit 35974d1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
2 changes: 1 addition & 1 deletion edxsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
""" Container module for testing / demoing search """

__version__ = '4.1.0'
__version__ = '4.1.1'
97 changes: 70 additions & 27 deletions search/meilisearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,22 @@ class MeilisearchEngine(SearchEngine):
compliant with edx-search's ElasticSearchEngine.
"""

def __init__(self, index=None):
def __init__(self, index=None) -> None:
super().__init__(index=index)
self.meilisearch_index = get_meilisearch_index(self.index_name)
self._meilisearch_index: t.Optional[meilisearch.index.Index] = None

@property
def meilisearch_index(self) -> meilisearch.index.Index:
"""
Lazy load meilisearch index.
"""
if self._meilisearch_index is None:
meilisearch_index_name = get_meilisearch_index_name(self.index_name)
meilisearch_client = get_meilisearch_client()
self._meilisearch_index = meilisearch_client.get_index(
meilisearch_index_name
)
return self._meilisearch_index

@property
def meilisearch_index_name(self):
Expand Down Expand Up @@ -211,7 +224,7 @@ def print_failed_meilisearch_tasks(count: int = 10):
print(result)


def create_indexes(index_filterables: dict[str, list[str]] = None):
def create_indexes(index_filterables: t.Optional[dict[str, list[str]]] = None):
"""
This is an initialization function that creates indexes and makes sure that they
support the right facetting.
Expand All @@ -225,38 +238,68 @@ def create_indexes(index_filterables: dict[str, list[str]] = None):
client = get_meilisearch_client()
for index_name, filterables in index_filterables.items():
meilisearch_index_name = get_meilisearch_index_name(index_name)
try:
index = client.get_index(meilisearch_index_name)
except meilisearch.errors.MeilisearchApiError as e:
if e.code != "index_not_found":
raise
client.create_index(
meilisearch_index_name, {"primaryKey": PRIMARY_KEY_FIELD_NAME}
)
# Get the index again
index = client.get_index(meilisearch_index_name)
index = get_or_create_meilisearch_index(client, meilisearch_index_name)
update_index_filterables(client, index, filterables)


def get_or_create_meilisearch_index(
client: meilisearch.Client, index_name: str
) -> meilisearch.index.Index:
"""
Get an index. If it does not exist, create it.
# Update filterables if there are some new elements
if filterables:
existing_filterables = set(index.get_filterable_attributes())
if not set(filterables).issubset(existing_filterables):
all_filterables = list(existing_filterables.union(filterables))
index.update_filterable_attributes(all_filterables)
This will fail with a RuntimeError if we fail to create the index. It will fail with
a MeilisearchApiError in other failure cases.
"""
try:
return client.get_index(index_name)
except meilisearch.errors.MeilisearchApiError as e:
if e.code != "index_not_found":
raise
task_info = client.create_index(
index_name, {"primaryKey": PRIMARY_KEY_FIELD_NAME}
)
wait_for_task_to_succeed(client, task_info)
# Get the index again
return client.get_index(index_name)


def get_meilisearch_index(index_name: str):
def update_index_filterables(
client: meilisearch.Client, index: meilisearch.index.Index, filterables: list[str]
) -> None:
"""
Return a meilisearch index.
Make sure that the filterable fields of an index include the given list of fields.
Note that the index may not exist, and it will be created on first insertion.
ideally, the initialisation function `create_indexes` should be run first.
If existing fields are present, they are preserved.
"""
meilisearch_client = get_meilisearch_client()
meilisearch_index_name = get_meilisearch_index_name(index_name)
return meilisearch_client.index(meilisearch_index_name)
if not filterables:
return
existing_filterables = set(index.get_filterable_attributes())
if set(filterables).issubset(existing_filterables):
# all filterables fields are already present
return
all_filterables = list(existing_filterables.union(filterables))
task_info = index.update_filterable_attributes(all_filterables)
wait_for_task_to_succeed(client, task_info)


def wait_for_task_to_succeed(
client: meilisearch.Client,
task_info: meilisearch.task.TaskInfo,
timeout_in_ms: int = 5000,
) -> None:
"""
Wait for a Meilisearch task to succeed. If it does not, raise RuntimeError.
"""
task = client.wait_for_task(task_info.task_uid, timeout_in_ms=timeout_in_ms)
if task.status != "succeeded":
raise RuntimeError(f"Failed meilisearch task: {task}")


def get_meilisearch_client():
"""
Return a Meilisearch client with the right settings.
"""
return meilisearch.Client(MEILISEARCH_URL, api_key=MEILISEARCH_API_KEY)


Expand Down Expand Up @@ -332,7 +375,7 @@ def get_search_params(
Return a dictionary of parameters that should be passed to the Meilisearch client
`.search()` method.
"""
params = {"showRankingScore": True}
params: dict[str, t.Any] = {"showRankingScore": True}

# Aggregation
if aggregation_terms:
Expand Down

0 comments on commit 35974d1

Please sign in to comment.