def run_fargate_task()

in dags/processing/processing.py [0:0]


def run_fargate_task(**kwargs):
    """
    Runs the image extraction Fargate task.

    :param kwargs:
    :return:
    """
    import boto3

    key = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_key")
    bucket = kwargs['ti'].xcom_pull(task_ids=f"bag_file_sensor", key=f"filename_s3_bucket")

    dest_bucket = kwargs['bucket_dest']
    private_subnets = kwargs['private_subnets'].split(",")
    fargate_cluster = kwargs['fargate_cluster']
    fargate_task_arn = kwargs['fargate_task_arn']
    fargate_task_name = kwargs['fargate_task_name']

    client = boto3.client('ecs')
    response = client.run_task(
        cluster=fargate_cluster,
        launchType='FARGATE',
        taskDefinition=fargate_task_arn,
        count=1,
        platformVersion='LATEST',
        overrides={
            'containerOverrides': [
                {
                    'name': fargate_task_name,
                    'environment': [
                        {
                            'name': 's3_source',
                            'value': bucket,
                        },
                        {
                            'name': 's3_source_prefix',
                            'value': key,
                        },
                        {
                            'name': 's3_destination',
                            'value': dest_bucket,
                        },
                        {
                            'name': 'topics_to_extract',
                            'value': '/tf',
                        }
                    ]
                }
            ]
        },
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': private_subnets,
                'assignPublicIp': 'DISABLED'
            }
        }
    )

    # push task arn to xcom
    task_arn = response["tasks"][0]["taskArn"]
    print(f"Pushing task arn {task_arn} to key 'extract_task_arn'.")
    kwargs['ti'].xcom_push(key="extract_task_arn", value=task_arn)

    # wait for task to be in 'running' state
    # will wait max 10mins for task to enter 'running' state before failing
    print(f"Waiting for task {task_arn} to enter 'running' state.")
    waiter = client.get_waiter('tasks_running')
    waiter.wait(cluster=fargate_cluster, tasks=[task_arn])