liminal/runners/airflow/tasks/containerable.py (57 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import json import logging import os from abc import ABC from typing import Dict from liminal.core import environment from liminal.runners.airflow.config import standalone_variable_backend from liminal.runners.airflow.config.standalone_variable_backend import get_variable from liminal.runners.airflow.model import task _LOG = logging.getLogger(__name__) ENV = 'env' DEFAULT = 'default' class ContainerTask(task.Task, ABC): """ K8S Containerable task """ def __init__( self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config, task_config, variables=None ): super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config, task_config, variables) env = standalone_variable_backend.get_variable(ENV, DEFAULT) self.env_vars = self.__env_vars(env) self.image = self.task_config['image'] self.mounts = self.task_config.get('mounts', []) self.secrets = self.task_config.get('secrets', []) self.cmds, self.arguments = self._kubernetes_cmds_and_arguments() def _kubernetes_cmds_and_arguments(self): cmds = ['/bin/sh', '-c'] arguments = [self.task_config['cmd']] return cmds, arguments @staticmethod def __get_local_env_params_from_env_file(): env_file = f'{environment.get_liminal_home()}/env' if os.path.isfile(env_file): _LOG.info(f'found env file at {env_file}') result = {} with open(env_file) as f: lines = f.readlines() for line in lines: if line and line.strip() and line.strip()[0:1] != '#': parts = line.strip().split('=') if len(parts) == 2: result[parts[0]] = parts[1] return result else: return {} def __env_vars(self, env) -> Dict: env_vars = dict(self.task_config['env_vars']) if 'env_vars' in self.task_config else {} env_vars.update(self.__get_local_env_params_from_env_file()) airflow_configuration_variable = get_variable( f'''{self.pipeline_config['pipeline']}_dag_configuration''', default_val=None ) if airflow_configuration_variable: airflow_configs = json.loads(airflow_configuration_variable) environment_variables_key = f"{self.pipeline_config['pipeline']}_environment_variables" if environment_variables_key in airflow_configs: env_vars = airflow_configs[environment_variables_key] if ENV not in env_vars: env_vars[ENV] = env return {k: str(v) for k, v in env_vars.items()}