in dags/processing/processing.py [0:0]
def wait_for_extraction(**kwargs):
"""
Waits for image extraction to finish and returns next task to execute
:param kwargs:
:return:
"""
# retrieve task id
task_arn = kwargs['ti'].xcom_pull(task_ids=f"extract_png", key=f"extract_task_arn")
fargate_cluster = kwargs['fargate_cluster']
# poll task id until stopped
import boto3
client = boto3.client('ecs')
response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])
import time
polls = round(60 * 30 / 10)
while (response["tasks"][0]["lastStatus"] != "STOPPED") and polls > 0:
polls -= 1
time.sleep(10)
response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])
print(f"Checking Fargate task status.\nLast queried status is {response['tasks'][0]['lastStatus']}.")
# final check
response = client.describe_tasks(cluster=fargate_cluster, tasks=[task_arn])
last_status = response["tasks"][0]["lastStatus"]
if last_status == "STOPPED":
return "extraction_success"
return "extraction_failed"