def wait_for_all_addition_tasks()

in src/doc_builder/meilisearch_helper.py [0:0]


def wait_for_all_addition_tasks(client: Client, index_name: str, after_started_at: Optional[datetime] = None):
    """
    Wait for all document addition/update tasks to finish for a specific index
    """
    print(f"Waiting for all addition tasks on index '{index_name}' to finish...")

    # Convert datetime to the format expected by MeiliSearch if provided
    after_started_at_str = None
    if after_started_at:
        after_started_at_str = after_started_at.isoformat()

    # Keep checking until there are no more tasks to process
    while True:
        # Get processing tasks for the specific index
        task_params = {
            "indexUids": [index_name],
            "types": ["documentAdditionOrUpdate"],
            "statuses": ["enqueued", "processing"],
        }
        if after_started_at_str:
            task_params["afterStartedAt"] = after_started_at_str

        processing_tasks = client.get_tasks(task_params)

        if len(processing_tasks.results) == 0:
            break

        print(f"Found {len(processing_tasks.results)} tasks still processing on index '{index_name}', waiting...")
        # Wait for one minute before retrying
        sleep(60)

    # Get all failed tasks for the specific index
    failed_task_ids = []
    from_task = None

    while True:
        failed_params = {"indexUids": [index_name], "types": ["documentAdditionOrUpdate"], "statuses": ["failed"]}
        if after_started_at_str:
            failed_params["afterStartedAt"] = after_started_at_str
        if from_task is not None:
            failed_params["from"] = from_task

        failed_tasks = client.get_tasks(failed_params)

        if len(failed_tasks.results) > 0:
            failed_task_ids.extend([task.task_uid for task in failed_tasks.results])

        # Check if there are more results to fetch
        if not hasattr(failed_tasks, "next") or failed_tasks.next is None:
            break
        from_task = failed_tasks.next

    if failed_task_ids:
        print(f"Failed addition task IDs on index '{index_name}': {failed_task_ids}")

    print("Finished waiting for addition tasks on index '{index_name}' to finish.")