def wait_for_extraction()

in dags/processing/processing.py [0:0]


def wait_for_extraction(**kwargs):
    """
    Waits for image extraction to finish and returns next task to execute

    :param kwargs:
    :return:
    """

    # retrieve task id
    task_arn = kwargs['ti'].xcom_pull(task_ids=f"extract_png", key=f"extract_task_arn")
    fargate_cluster = kwargs['fargate_cluster']

    # poll task id until stopped
    import boto3
    client = boto3.client('ecs')
    response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])

    import time
    polls = round(60 * 30 / 10)
    while (response["tasks"][0]["lastStatus"] != "STOPPED") and polls > 0:
        polls -= 1
        time.sleep(10)
        response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])
        print(f"Checking Fargate task status.\nLast queried status is {response['tasks'][0]['lastStatus']}.")

    # final check
    response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])
    last_status = response["tasks"][0]["lastStatus"]

    if last_status == "STOPPED":
        return "extraction_success"

    return "extraction_failed"