# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Maps FS node to Airflow's DAG"""

import shlex
from typing import List, Set

from xml.etree.ElementTree import Element

from o2a.converter.task import Task
from o2a.mappers.action_mapper import ActionMapper
from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet
from o2a.utils.relation_utils import chain
from o2a.utils.el_utils import normalize_path

ACTION_TYPE = "fs"

FS_OP_MKDIR = "mkdir"
FS_OP_DELETE = "delete"
FS_OP_MOVE = "move"
FS_OP_CHMOD = "chmod"
FS_OP_TOUCHZ = "touchz"
FS_OP_CHGRP = "chgrp"

FS_TAG_PATH = "path"
FS_TAG_SOURCE = "source"
FS_TAG_TARGET = "target"
FS_TAG_RECURSIVE = "recursive"
FS_TAG_DIRFILES = "dir-files"
FS_TAG_PERMISSIONS = "permissions"
FS_TAG_GROUP = "group"


def prepare_mkdir_command(node: Element, props: PropertySet):
    path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
    command = "fs -mkdir -p {path}".format(path=shlex.quote(path))
    return command


def prepare_delete_command(node: Element, props: PropertySet):
    path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
    command = "fs -rm -f -r {path}".format(path=shlex.quote(path))

    return command


def prepare_move_command(node: Element, props: PropertySet):
    source = normalize_path(node.attrib[FS_TAG_SOURCE], props=props)
    target = normalize_path(node.attrib[FS_TAG_TARGET], props=props, allow_no_schema=True)

    command = "fs -mv {source} {target}".format(source=shlex.quote(source), target=shlex.quote(target))
    return command


def prepare_chmod_command(node: Element, props: PropertySet):
    path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
    permission = node.attrib[FS_TAG_PERMISSIONS]
    # TODO: Add support for dirFiles Reference: GH issues #80
    #       dirFiles = bool_value(node, FS_TAG _DIRFILES)
    recursive = node.find(FS_TAG_RECURSIVE) is not None
    extra_param = "-R" if recursive else ""

    command = "fs -chmod {extra} {permission} {path}".format(
        extra=extra_param, path=shlex.quote(path), permission=shlex.quote(permission)
    )
    return command


def prepare_touchz_command(node: Element, props: PropertySet):
    path = normalize_path(node.attrib[FS_TAG_PATH], props=props)

    command = "fs -touchz {path}".format(path=shlex.quote(path))
    return command


def prepare_chgrp_command(node: Element, props: PropertySet):
    path = normalize_path(node.attrib[FS_TAG_PATH], props=props)
    group = node.attrib[FS_TAG_GROUP]

    recursive = node.find(FS_TAG_RECURSIVE) is not None
    extra_param = "-R" if recursive else ""

    command = "fs -chgrp {extra} {group} {path}".format(
        extra=extra_param, group=shlex.quote(group), path=shlex.quote(path)
    )
    return command


FS_OPERATION_MAPPERS = {
    FS_OP_MKDIR: prepare_mkdir_command,
    FS_OP_DELETE: prepare_delete_command,
    FS_OP_MOVE: prepare_move_command,
    FS_OP_CHMOD: prepare_chmod_command,
    FS_OP_TOUCHZ: prepare_touchz_command,
    FS_OP_CHGRP: prepare_chgrp_command,
}


class FsMapper(ActionMapper):
    """
    Converts a FS Oozie node to an Airflow task.
    """

    def __init__(self, oozie_node: Element, name: str, dag_name: str, props: PropertySet, **kwargs):
        super().__init__(oozie_node=oozie_node, name=name, props=props, dag_name=dag_name, **kwargs)
        self.oozie_node = oozie_node
        self.tasks: List[Task] = []

    def on_parse_node(self):
        super().on_parse_node()
        self.tasks = self.parse_tasks()

    def parse_tasks(self) -> List[Task]:
        """
        Processes the nodes responsible for determining what operations are performed on the filesystem and
        returns the tasks that suit them.
        """
        tasks: List[Task] = []
        operation_nodes = [node for node in self.oozie_node if node.tag in FS_OPERATION_MAPPERS.keys()]
        operation_nodes_count = len(operation_nodes)

        for index, node in enumerate(operation_nodes):
            task = self.parse_fs_operation(index, node, operation_nodes_count)
            tasks.append(task)

        if not tasks:
            # Each mapper must return at least one task
            return [Task(task_id=self.name, template_name="dummy.tpl")]

        return tasks

    def to_tasks_and_relations(self):
        return self.tasks, chain(self.tasks)

    def required_imports(self) -> Set[str]:
        return {"from airflow.operators import empty", "from airflow.operators import bash"}

    def parse_fs_operation(self, index: int, node: Element, operation_nodes_count: int) -> Task:
        tag_name = node.tag
        task_id = self.name if operation_nodes_count == 1 else f"{self.name}_fs_{index}_{tag_name}"
        mapper_fn = FS_OPERATION_MAPPERS[tag_name]
        pig_command = mapper_fn(node, props=self.props)

        return Task(
            task_id=task_id,
            template_name="fs_op.tpl",
            template_params=dict(
                pig_command=pig_command, action_node_properties=self.props.action_node_properties
            ),
        )
