in liminal/runners/airflow/executors/emr.py [0:0]
def _apply_executor_task_to_dag(self, **kwargs):
task = kwargs['task']
parent = task.parent
self._validate_task_type(task)
# assuming emr already exists
add_step = executor.add_variables_to_operator(
EmrAddStepsOperator(
task_id=f'{task.task_id}_add_step',
job_flow_id=self.job_flow_id,
job_flow_name=self.job_flow_name,
aws_conn_id=self.aws_conn_id,
steps=self.__generate_emr_step(task.task_id, [str(x) for x in task.get_runnable_command()]),
cluster_states=self.cluster_states,
),
task,
)
if task.parent:
parent.set_downstream(add_step)
emr_sensor_step = executor.add_variables_to_operator(
EmrStepSensor(
task_id=f'{task.task_id}_watch_step',
job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='job_flow_id') }}",
step_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='return_value')[0] }}",
aws_conn_id=self.aws_conn_id,
),
task,
)
add_step.set_downstream(emr_sensor_step)
return emr_sensor_step