in src/doc_builder/meilisearch_helper.py [0:0]
def wait_for_task_completion(func: MeilisearchFunc) -> MeilisearchFunc:
"""
Decorator to wait for MeiliSearch task completion
A function that is being decorated should return (Client, TaskInfo)
"""
@wraps(func)
def wrapped_meilisearch_function(*args, **kwargs):
try:
# Extract the Client and Task info from the function's return value
client, task = func(*args, **kwargs)
index_id = args[1] # Adjust this index based on where it actually appears in your arguments
task_id = task.task_uid
while True:
try:
# Get the latest task status from the API
task = client.index(index_id).get_task(task_id)
except Exception as e:
# If we can't get the task status, raise an error
raise Exception(f"Failed to get task status for task {task_id}: {str(e)}") from e
# task failed
if task.status == "failed":
# Optionally, retrieve more detailed error information if available
error_message = task.error.get("message") if task.error else "Unknown error"
error_type = task.error.get("type") if task.error else "Unknown"
error_link = task.error.get("link") if task.error else "No additional information"
# Raise an exception with the error details
raise Exception(
f"Task {task_id} failed with error type '{error_type}': {error_message}. More info: {error_link}"
)
# task succeeded
elif task.status == "succeeded":
return task
# task processing - continue waiting
else:
sleep(20)
except Exception as e:
# Re-raise any exception that occurs during the meilisearch operation
raise Exception(f"Meilisearch operation failed: {str(e)}") from e
return wrapped_meilisearch_function