in src/doc_builder/meilisearch_helper.py [0:0]
def wait_for_all_addition_tasks(client: Client, index_name: str, after_started_at: Optional[datetime] = None):
"""
Wait for all document addition/update tasks to finish for a specific index
"""
print(f"Waiting for all addition tasks on index '{index_name}' to finish...")
# Convert datetime to the format expected by MeiliSearch if provided
after_started_at_str = None
if after_started_at:
after_started_at_str = after_started_at.isoformat()
# Keep checking until there are no more tasks to process
while True:
# Get processing tasks for the specific index
task_params = {
"indexUids": [index_name],
"types": ["documentAdditionOrUpdate"],
"statuses": ["enqueued", "processing"],
}
if after_started_at_str:
task_params["afterStartedAt"] = after_started_at_str
processing_tasks = client.get_tasks(task_params)
if len(processing_tasks.results) == 0:
break
print(f"Found {len(processing_tasks.results)} tasks still processing on index '{index_name}', waiting...")
# Wait for one minute before retrying
sleep(60)
# Get all failed tasks for the specific index
failed_task_ids = []
from_task = None
while True:
failed_params = {"indexUids": [index_name], "types": ["documentAdditionOrUpdate"], "statuses": ["failed"]}
if after_started_at_str:
failed_params["afterStartedAt"] = after_started_at_str
if from_task is not None:
failed_params["from"] = from_task
failed_tasks = client.get_tasks(failed_params)
if len(failed_tasks.results) > 0:
failed_task_ids.extend([task.task_uid for task in failed_tasks.results])
# Check if there are more results to fetch
if not hasattr(failed_tasks, "next") or failed_tasks.next is None:
break
from_task = failed_tasks.next
if failed_task_ids:
print(f"Failed addition task IDs on index '{index_name}': {failed_task_ids}")
print("Finished waiting for addition tasks on index '{index_name}' to finish.")