#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import os
import traceback

from liminal.core import environment
from liminal.core.config.defaults import base, default_configs
from liminal.core.util import dict_util, files_util


class ConfigUtil:
    """
    Load and enrich config files under configs_path.
    """

    __BASE = 'base'
    __PIPELINES = 'pipelines'
    __SUPER = 'super'
    __TYPE = 'type'
    __SUB = 'sub'
    __SERVICES = 'services'
    __TASKS = 'tasks'
    __PIPELINE_DEFAULTS = 'pipeline_defaults'
    __TASK_DEFAULTS = 'task_defaults'
    __BEFORE_TASKS = 'before_tasks'
    __AFTER_TASKS = 'after_tasks'
    __EXECUTORS = 'executors'
    __IMAGES = 'images'
    __BASE = "base"
    __PIPELINES = "pipelines"
    __SUPER = "super"
    __TYPE = "type"
    __SUB = "sub"
    __SERVICES = "services"
    __TASKS = "tasks"
    __PIPELINE_DEFAULTS = "pipeline_defaults"
    __TASK_DEFAULTS = "task_defaults"
    __BEFORE_TASKS = "before_tasks"
    __AFTER_TASKS = "after_tasks"
    __EXECUTORS = "executors"

    def __init__(self, configs_path):
        self.configs_path = configs_path
        self.config_files = files_util.load(configs_path)
        self.base = base.BASE
        self.loaded_subliminals = []
        self.snapshot_path = os.path.join(environment.get_airflow_home_dir(), '../liminal_config_files')

    def safe_load(self, is_render_variables, soft_merge=False):
        """
        :returns list of config files after enrich with defaults and supers
        """
        if self.loaded_subliminals:
            return self.loaded_subliminals

        configs = self.config_files.values()
        enriched_configs = []

        for subliminal in [config for config in configs if self.__is_subliminal(config)]:
            name = subliminal.get('name')
            logging.info(f'Loading yml {name}')
            # noinspection PyBroadException
            try:
                superliminal = self.__get_superliminal(subliminal, soft_merge)
                enriched_config = self.__merge_configs(subliminal, superliminal, is_render_variables, soft_merge)
                enriched_configs.append(enriched_config)
            except Exception:
                logging.error(f'Failed to load yml {name}')
                traceback.print_exc()

        self.loaded_subliminals = enriched_configs

        return self.loaded_subliminals

    def __merge_configs(self, subliminal, superliminal, is_render_variables, soft_merge):
        if not superliminal:
            return subliminal

        sub = subliminal.copy()
        supr = superliminal.copy()

        merged_superliminal = self.__merge_configs(
            supr, self.__get_superliminal(supr, soft_merge), is_render_variables, soft_merge
        )

        sub[self.__EXECUTORS] = self.__merge_section(sub, merged_superliminal, self.__EXECUTORS)
        sub[self.__IMAGES] = self.__merge_section(sub, merged_superliminal, self.__IMAGES)

        if self.__is_subliminal(sub):
            return self.__merge_sub_and_super(sub, merged_superliminal, is_render_variables)
        else:
            return self.__merge_superliminals(sub, merged_superliminal)

    def __get_superliminal(self, liminal, soft_merge):
        superliminal = {}
        if not self.__is_base_config(liminal):
            superliminal_name = liminal.get(self.__SUPER, '')
            if not superliminal_name:
                superliminal = self.base
            else:
                superliminal = self.__get_config(superliminal_name)
                if not superliminal:
                    supr_is_missing_msg = (
                        f"superliminal '{superliminal_name}' " + f"is missing from '{self.configs_path}'"
                    )
                    if soft_merge:
                        logging.warning(supr_is_missing_msg)
                    else:
                        raise FileNotFoundError(supr_is_missing_msg)

        return superliminal

    def __get_base_config(self):
        return self.base

    def __is_base_config(self, config):
        return config.get('name', '') == self.__BASE

    def __is_subliminal(self, config):
        is_subliminal = config.get(self.__TYPE, self.__SUB) != self.__SUPER
        if is_subliminal:
            config[self.__TYPE] = self.__SUB
        return is_subliminal

    def __get_config(self, config_name):
        return self.config_files.get(config_name)

    def __merge_sub_and_super(self, sub, supr, is_render_variables):
        merged_pipelines = list()

        for pipeline in sub.get(self.__PIPELINES, {}):
            final_pipeline = self.__apply_pipeline_defaults(sub, supr, pipeline)
            merged_pipelines.append(final_pipeline)

        sub[self.__PIPELINES] = merged_pipelines
        sub[self.__SERVICES] = default_configs.apply_service_defaults(sub, supr)

        sub = dict_util.merge_dicts(supr.copy(), sub)

        return default_configs.apply_variable_substitution(sub, supr, is_render_variables)

    def __merge_superliminals(self, super1, super2):
        super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy()
        super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy()

        super1[self.__PIPELINE_DEFAULTS] = super1_pipeline_defaults
        super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = super2_pipeline_defaults.pop(
            self.__BEFORE_TASKS, []
        ) + super1_pipeline_defaults.pop(self.__BEFORE_TASKS, [])

        super2[self.__PIPELINE_DEFAULTS] = super2_pipeline_defaults
        super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = super1_pipeline_defaults.pop(
            self.__AFTER_TASKS, []
        ) + super2_pipeline_defaults.pop(self.__AFTER_TASKS, [])

        # merge supers tasks
        return dict_util.merge_dicts(super1, super2, True)

    def snapshot_final_liminal_configs(self):
        files_util.dump_liminal_configs(liminal_configs=self.loaded_subliminals, path=self.snapshot_path)

    def __merge_section(self, subliminal, superliminal, section):
        return self.__deep_list_keyword_merge(section[:-1], subliminal.get(section, []), superliminal.get(section, []))

    @staticmethod
    def __apply_pipeline_defaults(subliminal, superliminal, pipeline):
        return default_configs.apply_pipeline_defaults(subliminal, superliminal, pipeline)

    @staticmethod
    def __deep_list_keyword_merge(unique_key_name, subliminal_list_conf, superliminal_list_conf):
        subliminal_key_map = {item[unique_key_name]: item for item in subliminal_list_conf}
        superliminal_key_map = {item[unique_key_name]: item for item in superliminal_list_conf}

        return list(dict_util.merge_dicts(superliminal_key_map, subliminal_key_map, recursive=True).values())
