in liminal/runners/airflow/tasks/containerable.py [0:0]
def __env_vars(self, env) -> Dict:
env_vars = dict(self.task_config['env_vars']) if 'env_vars' in self.task_config else {}
env_vars.update(self.__get_local_env_params_from_env_file())
airflow_configuration_variable = get_variable(
f'''{self.pipeline_config['pipeline']}_dag_configuration''', default_val=None
)
if airflow_configuration_variable:
airflow_configs = json.loads(airflow_configuration_variable)
environment_variables_key = f"{self.pipeline_config['pipeline']}_environment_variables"
if environment_variables_key in airflow_configs:
env_vars = airflow_configs[environment_variables_key]
if ENV not in env_vars:
env_vars[ENV] = env
return {k: str(v) for k, v in env_vars.items()}