def _apply_executor_task_to_dag()

in liminal/runners/airflow/executors/emr.py [0:0]


    def _apply_executor_task_to_dag(self, **kwargs):
        task = kwargs['task']
        parent = task.parent

        self._validate_task_type(task)

        # assuming emr already exists
        add_step = executor.add_variables_to_operator(
            EmrAddStepsOperator(
                task_id=f'{task.task_id}_add_step',
                job_flow_id=self.job_flow_id,
                job_flow_name=self.job_flow_name,
                aws_conn_id=self.aws_conn_id,
                steps=self.__generate_emr_step(task.task_id, [str(x) for x in task.get_runnable_command()]),
                cluster_states=self.cluster_states,
            ),
            task,
        )

        if task.parent:
            parent.set_downstream(add_step)

        emr_sensor_step = executor.add_variables_to_operator(
            EmrStepSensor(
                task_id=f'{task.task_id}_watch_step',
                job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='job_flow_id') }}",
                step_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='return_value')[0] }}",
                aws_conn_id=self.aws_conn_id,
            ),
            task,
        )

        add_step.set_downstream(emr_sensor_step)

        return emr_sensor_step