liminal/runners/airflow/executors/emr.py (48 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from liminal.runners.airflow.model import executor
from liminal.runners.airflow.tasks import hadoop
class EMRExecutor(executor.Executor):
"""
Executes a EMR steps
"""
supported_task_types = [hadoop.HadoopTask]
def __init__(self, executor_id, liminal_config, executor_config):
super().__init__(executor_id, liminal_config, executor_config)
self.aws_conn_id = self.executor_config.get('aws_conn_id', 'aws_default')
self.cluster_states = self.executor_config.get('cluster_state', ['RUNNING', 'WAITING'])
self.job_flow_id = self.executor_config.get('cluster_id', None)
self.job_flow_name = self.executor_config.get('cluster_name', None)
def _apply_executor_task_to_dag(self, **kwargs):
task = kwargs['task']
parent = task.parent
self._validate_task_type(task)
# assuming emr already exists
add_step = executor.add_variables_to_operator(
EmrAddStepsOperator(
task_id=f'{task.task_id}_add_step',
job_flow_id=self.job_flow_id,
job_flow_name=self.job_flow_name,
aws_conn_id=self.aws_conn_id,
steps=self.__generate_emr_step(task.task_id, [str(x) for x in task.get_runnable_command()]),
cluster_states=self.cluster_states,
),
task,
)
if task.parent:
parent.set_downstream(add_step)
emr_sensor_step = executor.add_variables_to_operator(
EmrStepSensor(
task_id=f'{task.task_id}_watch_step',
job_flow_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='job_flow_id') }}",
step_id="{{ task_instance.xcom_pull('" + add_step.task_id + "', key='return_value')[0] }}",
aws_conn_id=self.aws_conn_id,
),
task,
)
add_step.set_downstream(emr_sensor_step)
return emr_sensor_step
def __generate_emr_step(self, task_id, args):
return [
{
'Name': task_id,
**self.executor_config.get('properties', {}),
'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': args},
}
]