#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2024 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .. import errors, serializers, utils
from ..compat import long_type, six
from .cache import cache_parent
from .core import Iterable, LazyLoad, cache
from .volumes import Volume


class FSVolumeObject(LazyLoad):
    __slots__ = ("_volume_fs_tunnel",)
    _type_indicator = "_isdir"
    _cache_name_arg = "path"

    class CreateRequestXML(serializers.XMLSerializableModel):
        _root = "Item"

        type = serializers.XMLNodeField("Type")
        path = serializers.XMLNodeField("Path")

    class UpdateRequestXML(serializers.XMLSerializableModel):
        _root = "Item"

        path = serializers.XMLNodeField("Path")
        replication = serializers.XMLNodeField("Replication")

    _project = serializers.XMLNodeField("Project")
    _volume = serializers.XMLNodeField("Volume")
    path = serializers.XMLNodeField("Path")
    _isdir = serializers.XMLNodeField("Isdir", type="bool")
    permission = serializers.XMLNodeField("permission")
    _replication = serializers.XMLNodeField("BlockReplications", parse_callback=int)
    length = serializers.XMLNodeField("Length", parse_callback=long_type)
    quota = serializers.XMLNodeField("Quota", parse_callback=long_type)
    block_size = serializers.XMLNodeField("BlockSize", parse_callback=long_type)
    owner = serializers.XMLNodeField("Owner")
    group = serializers.XMLNodeField("Group")
    creation_time = serializers.XMLNodeField("CreationTime", type="rfc822")
    access_time = serializers.XMLNodeField("AccessTime", type="rfc822")
    last_modified_time = serializers.XMLNodeField("ModificationTime", type="rfc822")
    symlink = serializers.XMLNodeField("Symlink")
    sign_url = serializers.XMLNodeField("URL")

    @classmethod
    def _get_base_cls(cls):
        return FSVolumeObject

    @classmethod
    def _get_dir_cls(cls):
        return FSVolumeDir

    @classmethod
    def _get_file_cls(cls):
        return FSVolumeFile

    @classmethod
    def _get_objects_cls(cls):
        return FSVolumeObjects

    @staticmethod
    def _filter_cache(_, **kwargs):
        isdir = kwargs.get("_isdir")
        return isdir is not None and isdir != "UNKNOWN"

    @cache
    def __new__(cls, *args, **kwargs):
        isdir = kwargs.get("_isdir")

        base_cls = cls._get_base_cls()
        if cls is not base_cls and issubclass(cls, base_cls):
            return object.__new__(cls)
        if isdir is not None:
            if isdir == "UNKNOWN":
                return object.__new__(base_cls)
            return object.__new__(cls._get_dir_cls() if isdir else cls._get_file_cls())

        obj = base_cls(_isdir="UNKNOWN", **kwargs)
        obj.reload()
        return base_cls(**obj.extract())

    @utils.experimental(
        "Volume2 is still experimental. Usage in production environment is strongly opposed.",
        cond=lambda self, *_, **kw: (
            type(self) in (FSVolumeObject, FSVolumeDir, FSVolumeFile)
            and type(kw.get("parent")) is FSVolume
        ),
    )
    def __init__(self, *args, **kwargs):
        super(FSVolumeObject, self).__init__(*args, **kwargs)

    def _name(self):
        return self.path

    def _set_state(self, name, parent, client):
        self.__init__(path=name, _parent=parent, _client=client)

    def split(self):
        return self.path.rsplit("/", 1)

    @property
    def basename(self):
        _, fn = self.split()
        return fn

    @property
    def dirname(self):
        dn, _ = self.split()
        return dn

    @property
    def volume(self):
        return self.parent

    @property
    def is_root(self):
        return self.path == "/" + self.parent.name

    def reload(self):
        # check if the volume path is the root
        if self.is_root:
            return

        schema_name = self.parent._get_schema_name()
        params = {}
        if schema_name is not None:
            params["curr_schema"] = schema_name

        resp = self._client.get(
            self.parent.resource(),
            action="meta",
            params=params,
            headers={"x-odps-volume-fs-path": self.path},
        )
        self.parse(self._client, resp, obj=self)

        self._loaded = True

    @staticmethod
    def _normpath(path):
        path = path.rstrip("/")
        i = 0
        parts = []
        start = 0
        while i < len(path):
            if path[i] == "/" or i == len(path) - 1:
                chunk = path[start : i + 1]
                start = i + 1
                if chunk in ["", "/", ".", "./"]:
                    # do nothing
                    pass
                elif chunk in ["..", "../"]:
                    if len(parts):
                        parts = parts[: len(parts) - 1]
                    else:
                        parts.append(chunk)
                else:
                    parts.append(chunk)
            i += 1
        if path.startswith("/"):
            return "/" + "".join(parts)
        return "".join(parts)

    def _del_cache(self, path):
        root_objs = self._get_objects_cls()(
            parent=self.volume.root, client=self._client
        )
        if not path.startswith("/"):
            path = self.path + "/" + path.lstrip("/")
        del root_objs[path]

    def move(self, new_path, replication=None):
        """
        Move current path to a new location.

        :param new_path: target location of current file / directory
        :param replication: number of replication
        """
        if not new_path.startswith("/"):
            new_path = self._normpath(self.dirname + "/" + new_path)
        else:
            new_path = self._normpath(new_path)
        if new_path == self.path:
            raise ValueError("New path should be different from the original one.")
        update_def = self.UpdateRequestXML(path=new_path)
        if replication:
            update_def.replication = replication
        headers = {
            "Content-Type": "application/xml",
            "x-odps-volume-fs-path": self.path,
        }

        schema_name = self.parent._get_schema_name()
        params = {}
        if schema_name is not None:
            params["curr_schema"] = schema_name

        self._client.put(
            self.parent.resource(),
            action="meta",
            params=params,
            headers=headers,
            data=update_def.serialize(),
        )

        self._del_cache(self.path)
        self.path = new_path
        self.reload()

    def _create_volume_fs_tunnel(self, endpoint=None, quota_name=None):
        if self._volume_fs_tunnel is not None:
            return self._volume_fs_tunnel

        from ..tunnel import VolumeFSTunnel

        self._volume_fs_tunnel = VolumeFSTunnel(
            client=self._client,
            project=self.project,
            endpoint=endpoint or self.project._tunnel_endpoint,
            quota_name=quota_name,
        )
        return self._volume_fs_tunnel


@cache_parent
class FSVolumeDir(FSVolumeObject):
    """
    VolumeFSDir represents a directory under a file system volume in ODPS.
    You can use ``create_dir`` to create a sub-directory, ``open_reader`` to open a file to read,
    ``open_writer`` to write a file and ``delete`` to remove. Following operations are also supported.

    >>> # iterate over all files and sub-directories
    >>> for o in fs_dir:
    >>>     print(o.path)
    >>> # check if a file exists in current volume
    >>> print(file_name in fs_dir)
    >>> # get a file/directory object
    >>> file_obj = fs_dir[file_name]
    """

    def __init__(self, **kw):
        super(FSVolumeDir, self).__init__(**kw)
        self._isdir = True

    @property
    def objects(self):
        return self._get_objects_cls()(parent=self, client=self._client)

    def create_dir(self, path, **kw):
        """
        Creates and returns a sub-directory under the current directory.
        :param str path: directory name to be created
        :return: directory object
        :rtype: :class:`odps.models.FSVolumeDir`
        """
        path = self.path + "/" + path.lstrip("/")
        dir_def = self.CreateRequestXML(type="directory", path=path)
        headers = {"Content-Type": "application/xml"}
        self._client.post(
            self.parent.resource(),
            headers=headers,
            data=dir_def.serialize(),
            curr_schema=self.parent._get_schema_name(),
        )

        dir_object = type(self)(path=path, parent=self.parent, client=self._client)
        dir_object.reload()
        return dir_object

    def __contains__(self, item):
        return item in self.objects

    def __iter__(self):
        return self.objects.iterate()

    def __getitem__(self, item):
        return self.objects[item]

    def delete(self, recursive=False):
        """
        Delete current directory.

        :param recursive: indicate whether a recursive deletion should be performed.
        """
        params = {"recursive": recursive}
        headers = {"x-odps-volume-fs-path": self.path}
        self._del_cache(self.path)
        self._client.delete(
            self.parent.resource(),
            params=params,
            headers=headers,
            curr_schema=self.parent._get_schema_name(),
        )

    def open_reader(self, path, **kw):
        """
        Open a volume file and read contents in it.

        :param str path: file name to be opened
        :param start: start position
        :param length: length limit
        :param quota_name: name of tunnel quota
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :return: file reader

        :Example:
        >>> with fs_dir.open_reader('file') as reader:
        >>>     [print(line) for line in reader]
        """
        endpoint = kw.pop("endpoint", None)
        quota_name = kw.pop("quota_name", None)
        tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name)
        path = (
            self.path.lstrip("/")[len(self.parent.name) :].lstrip("/")
            + "/"
            + path.lstrip("/")
        )
        return tunnel.open_reader(self.parent, path, **kw)

    def open_writer(self, path, replication=None, **kw):
        """
        Open a volume file and write contents into it.

        :param str path: file name to be opened
        :param quota_name: name of tunnel quota
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :return: file reader

        :Example:
        >>> with fs_dir.open_writer('file') as reader:
        >>>     writer.write('some content')
        """
        endpoint = kw.pop("endpoint", None)
        quota_name = kw.pop("quota_name", None)
        tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name)
        vol_path = (
            self.path.lstrip("/")[len(self.parent.name) :].lstrip("/")
            + "/"
            + path.lstrip("/")
        )
        return tunnel.open_writer(self.parent, vol_path, replication=replication, **kw)


@cache_parent
class FSVolumeFile(FSVolumeObject):
    def __init__(self, **kw):
        super(FSVolumeFile, self).__init__(**kw)
        self._isdir = False

    @property
    def replication(self):
        """
        Get / set replication number of the file.
        """
        return self._replication

    @replication.setter
    def replication(self, value):
        update_def = self.UpdateRequestXML(replication=value)
        headers = {
            "Content-Type": "application/xml",
            "x-odps-volume-fs-path": self.path,
        }

        schema_name = self.parent._get_schema_name()
        params = {}
        if schema_name is not None:
            params["curr_schema"] = schema_name

        self._client.put(
            self.parent.resource(),
            action="meta",
            params=params,
            headers=headers,
            data=update_def.serialize(),
        )
        self.reload()

    def delete(self, **_):
        """
        Delete current file.
        """
        params = {"recursive": False}
        headers = {"x-odps-volume-fs-path": self.path}

        schema_name = self.parent._get_schema_name()
        if schema_name is not None:
            params["curr_schema"] = schema_name

        self._del_cache(self.path)
        self._client.delete(self.parent.resource(), params=params, headers=headers)

    def open_reader(self, **kw):
        """
        Open current file and read contents in it.

        :param start: start position
        :param length: length limit
        :param quota_name: name of tunnel quota
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :return: file reader

        :Example:
        >>> with fs_file.open_reader('file') as reader:
        >>>     [print(line) for line in reader]
        """
        endpoint = kw.pop("endpoint", None)
        quota_name = kw.pop("quota_name", None)
        tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name)
        path = self.path.lstrip("/")[len(self.parent.name) :].lstrip("/")
        return tunnel.open_reader(self.parent, path, **kw)

    def open_writer(self, replication=None, **kw):
        endpoint = kw.pop("endpoint", None)
        quota_name = kw.pop("quota_name", None)
        tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name)
        path = self.path.lstrip("/")[len(self.parent.name) :].lstrip("/")
        return tunnel.open_writer(self.parent, path, replication=replication, **kw)


class FSVolumeObjects(Iterable):
    marker = serializers.XMLNodeField("Marker")
    max_items = serializers.XMLNodeField("MaxItems", parse_callback=int)
    objects = serializers.XMLNodesReferencesField(FSVolumeObject, "Item")

    @property
    def project(self):
        return self.parent.parent.parent

    @property
    def volume(self):
        return self.parent.parent

    @classmethod
    def _get_single_object_cls(cls):
        return FSVolumeObject

    def _get(self, name):
        path = name
        if not path.startswith(self.parent.path):
            path = self.parent.path + "/" + name.lstrip("/")
        return self._get_single_object_cls()(
            client=self._client, parent=self.volume, path=path
        )

    def __contains__(self, item):
        if isinstance(item, six.string_types):
            try:
                # as reload() will be done in constructor of VolumeFSObject, we return directly.
                return self._get(item)
            except errors.NoSuchObject:
                return False
        elif isinstance(item, self._get_single_object_cls()):
            obj = item
            try:
                obj.reload()
                return True
            except errors.NoSuchObject:
                return False
        else:
            return False

    def __iter__(self):
        return self.iterate()

    def iterate(self):
        params = {"expectmarker": "true"}
        headers = {"x-odps-volume-fs-path": self.parent.path}

        schema_name = self.volume._get_schema_name()
        if schema_name is not None:
            params["curr_schema"] = schema_name

        def _it():
            last_marker = params.get("marker")
            if "marker" in params and (last_marker is None or len(last_marker) == 0):
                return

            url = self.volume.resource()
            resp = self._client.get(url, params=params, headers=headers)

            r = type(self).parse(self._client, resp, obj=self, parent=self.volume)
            params["marker"] = r.marker

            return r.objects

        while True:
            objects = _it()
            if objects is None:
                break
            for obj in objects:
                yield obj


@cache_parent
class FSVolume(Volume):
    """
    FSVolume represents the new-fashioned file system volume in ODPS.
    You can use ``create_dir`` to create a directory, ``open_reader`` to open a file to read
    and ``open_writer`` to write a file. Following operations are also supported.

    >>> # iterate over all files and directories
    >>> for o in fs_volume:
    >>>     print(o.path)
    >>> # check if a file exists in current volume
    >>> print(file_name in fs_volume)
    >>> # get a file/directory object
    >>> file_obj = fs_volume[file_name]
    """

    __slots__ = ("_root_dir",)

    _dir_cls = FSVolumeDir

    def create_dir(self, path, **kw):
        """
        Creates and returns a directory under the current volume.
        :param str path: directory name to be created
        :return: directory object
        :rtype: :class:`odps.models.FSVolumeDir`
        """
        return self.root.create_dir(path, **kw)

    def __contains__(self, item):
        return item in self.root

    def __iter__(self):
        for it in self.root:
            yield it

    def __getitem__(self, item):
        if not item:
            return self.root
        return self.root[item]

    @property
    def path(self):
        return "/" + self.name

    @property
    def location(self):
        if self.type != Volume.Type.EXTERNAL:
            raise AttributeError
        return self.properties.get(Volume.EXTERNAL_VOLUME_LOCATION_KEY)

    @property
    def rolearn(self):
        if self.type != Volume.Type.EXTERNAL:
            raise AttributeError
        return self.properties.get(Volume.EXTERNAL_VOLUME_ROLEARN_KEY)

    def open_reader(self, path, **kw):
        """
        Open a volume file and read contents in it.

        :param str path: file name to be opened
        :param start: start position
        :param length: length limit
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :return: file reader

        :Example:
        >>> with volume.open_reader('file') as reader:
        >>>     [print(line) for line in reader]
        """
        return self.root.open_reader(path, **kw)

    def open_writer(self, path, replication=None, **kw):
        """
        Open a volume file and write contents into it.

        :param str path: file name to be opened
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :return: file reader

        :Example:
        >>> with volume.open_writer('file') as reader:
        >>>     writer.write('some content')
        """
        return self.root.open_writer(path, replication=replication, **kw)

    def delete(self, path, recursive=False):
        try:
            return self[path].delete(recursive=recursive)
        except TypeError:
            return self[path].delete()

    @property
    def root(self):
        if not self._root_dir:
            self._root_dir = self._dir_cls(
                path="/" + self.name, parent=self, client=self._client
            )
        return self._root_dir


# keep renames compatible
VolumeFSDir = FSVolumeDir
VolumeFSFile = FSVolumeFile
