in dags/processing/processing.py [0:0]
def run_fargate_task(**kwargs):
"""
Runs the image extraction Fargate task.
:param kwargs:
:return:
"""
import boto3
key = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_key")
bucket = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_bucket")
dest_bucket = kwargs['bucket_dest']
private_subnets = kwargs['private_subnets'].split(",")
fargate_cluster = kwargs['fargate_cluster']
fargate_task_arn = kwargs['fargate_task_arn']
fargate_task_name = kwargs['fargate_task_name']
client = boto3.client('ecs')
response = client.run_task(
cluster=fargate_cluster,
launchType='FARGATE',
taskDefinition=fargate_task_arn,
count=1,
platformVersion='LATEST',
overrides={
'containerOverrides': [
{
'name': fargate_task_name,
'environment': [
{
'name': 's3_source',
'value': bucket,
},
{
'name': 's3_source_prefix',
'value': key,
},
{
'name': 's3_destination',
'value': dest_bucket,
},
{
'name': 'topics_to_extract',
'value': '/tf',
}
]
}
]
},
networkConfiguration={
'awsvpcConfiguration': {
'subnets': private_subnets,
'assignPublicIp': 'DISABLED'
}
}
)
# push task arn to xcom
task_arn = response["tasks"][0]["taskArn"]
print(f"Pushing task arn {task_arn} to key 'extract_task_arn'.")
kwargs['ti'].xcom_push(key="extract_task_arn", value=task_arn)
# wait for task to be in 'running' state
# will wait max 10mins for task to enter 'running' state before failing
print(f"Waiting for task {task_arn} to enter 'running' state.")
waiter = client.get_waiter('tasks_running')
waiter.wait(cluster=fargate_cluster, tasks=[task_arn])