# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import inspect
import json  # noqa: F401
import os
import re
import warnings
import weakref

from . import accounts, errors, models, utils
from .compat import six, urlparse
from .config import options
from .rest import RestClient
from .tempobj import clean_stored_objects

DEFAULT_ENDPOINT = "http://service.odps.aliyun.com/api"
DEFAULT_REGION_NAME = "cn"
LOGVIEW_HOST_DEFAULT = "http://logview.aliyun.com"
JOB_INSIGHT_HOST_DEFAULT = "https://maxcompute.console.aliyun.com"

_ALTER_TABLE_REGEX = re.compile(
    r"^\s*(drop|alter)\s+table\s*(|if\s+exists)\s+(?P<table_name>[^\s;]+)", re.I
)
_ENDPOINT_HOST_WITH_REGION_REGEX = re.compile(
    r"service\.([^\.]+)\.(odps|maxcompute)\.aliyun(|-inc)\.com", re.I
)

_logview_host_cache = dict()


def _wrap_model_func(func):
    @six.wraps(func)
    def wrapped(self, *args, **kw):
        return func(self, *args, **kw)

    # keep method signature to avoid doc issue
    if hasattr(inspect, "signature"):
        wrapped.__signature__ = inspect.signature(func)
    return wrapped


@utils.attach_internal
class ODPS(object):
    """
    Main entrance to ODPS.

    Convenient operations on ODPS objects are provided.
    Please refer to `ODPS docs <https://help.aliyun.com/document_detail/27818.html>`_
    for more details.

    Generally, basic operations such as ``list``, ``get``, ``exist``, ``create``, ``delete``
    are provided for each ODPS object.
    Take the ``Table`` as an example.

    To create an ODPS instance, access_id and access_key is required, and should ensure correctness,
    or ``SignatureNotMatch`` error will throw. If `tunnel_endpoint` is not set, the tunnel API will
    route service URL automatically.

    :param access_id: Aliyun Access ID
    :param secret_access_key: Aliyun Access Key
    :param project: default project name
    :param endpoint: Rest service URL
    :param tunnel_endpoint: Tunnel service URL
    :param logview_host: Logview host URL
    :param app_account: Application account, instance of `odps.accounts.AppAccount` used for dual authentication

    :Example:

    >>> odps = ODPS('**your access id**', '**your access key**', 'default_project')
    >>>
    >>> for table in odps.list_tables():
    >>>    # handle each table
    >>>
    >>> table = odps.get_table('dual')
    >>>
    >>> odps.exist_table('dual') is True
    >>>
    >>> odps.create_table('test_table', schema)
    >>>
    >>> odps.delete_table('test_table')
    """

    def __init__(
        self,
        access_id=None,
        secret_access_key=None,
        project=None,
        endpoint=None,
        schema=None,
        app_account=None,
        logview_host=None,
        tunnel_endpoint=None,
        region_name=None,
        quota_name=None,
        namespace=None,
        **kw
    ):
        # avoid polluted copy sources :(
        access_id = utils.strip_if_str(access_id)
        secret_access_key = utils.strip_if_str(secret_access_key)
        project = utils.strip_if_str(project)
        endpoint = utils.strip_if_str(endpoint)
        schema = utils.strip_if_str(schema)
        logview_host = utils.strip_if_str(logview_host)
        tunnel_endpoint = utils.strip_if_str(tunnel_endpoint)
        region_name = utils.strip_if_str(region_name)
        quota_name = utils.strip_if_str(quota_name)
        namespace = utils.strip_if_str(namespace)

        if isinstance(access_id, accounts.BaseAccount):
            assert (
                secret_access_key is None
            ), "Cannot supply secret_access_key with an account"
            kw["account"], access_id = access_id, None

        self._init(
            access_id=access_id,
            secret_access_key=secret_access_key,
            project=project,
            endpoint=endpoint,
            schema=schema,
            app_account=app_account,
            logview_host=logview_host,
            tunnel_endpoint=tunnel_endpoint,
            region_name=region_name,
            quota_name=quota_name,
            namespace=namespace,
            **kw
        )
        clean_stored_objects(self)

    def _init(
        self,
        access_id=None,
        secret_access_key=None,
        project=None,
        endpoint=None,
        schema=None,
        region_name=None,
        namespace=None,
        **kw
    ):
        self._property_update_callbacks = set()

        account = kw.pop("account", None)
        self.app_account = kw.pop("app_account", None)

        if account is None:
            if access_id is not None:
                self.account = self._build_account(access_id, secret_access_key)
            elif options.account is not None:
                self.account = options.account
            else:
                self.account = accounts.from_environments()
                if self.account is None:
                    raise TypeError(
                        "`access_id` and `secret_access_key` should be provided."
                    )
        else:
            self.account = account
        self.endpoint = (
            endpoint
            or options.endpoint
            or os.getenv("ODPS_ENDPOINT")
            or DEFAULT_ENDPOINT
        )
        self.project = (
            project or options.default_project or os.getenv("ODPS_PROJECT_NAME")
        )
        self.region_name = region_name or self._get_region_from_endpoint(self.endpoint)
        self.namespace = (
            namespace or options.default_namespace or os.getenv("ODPS_NAMESPACE")
        )
        self._quota_name = kw.pop("quota_name", None)
        self._schema = schema

        rest_client_cls = kw.pop("rest_client_cls", None) or RestClient
        rest_client_kwargs = kw.pop("rest_client_kwargs", {})
        self.rest = rest_client_cls(
            self.account,
            self.endpoint,
            project,
            schema,
            app_account=self.app_account,
            proxy=options.api_proxy,
            region_name=self.region_name,
            namespace=self.namespace,
            tag="ODPS",
            **rest_client_kwargs
        )

        self._tunnel_endpoint = (
            kw.pop("tunnel_endpoint", None)
            or options.tunnel.endpoint
            or os.getenv("ODPS_TUNNEL_ENDPOINT")
        )

        self._logview_host = (
            kw.pop("logview_host", None)
            or options.logview_host
            or os.getenv("ODPS_LOGVIEW_HOST")
            or self.get_logview_host()
        )
        self._job_insight_host = (
            JOB_INSIGHT_HOST_DEFAULT
            if utils.is_job_insight_available(self.endpoint)
            or options.use_legacy_logview is False
            else None
        )

        self._default_tenant = models.Tenant(client=self.rest)

        self._projects = models.Projects(client=self.rest, _odps_ref=weakref.ref(self))
        if project:
            self._project = self.get_project()

        self._quotas = models.Quotas(client=self.rest)
        if self._quota_name:
            self._quota = self.get_quota()

        self._seahawks_url = kw.pop("seahawks_url", None)
        if self._seahawks_url:
            options.seahawks_url = self._seahawks_url

        self._default_session = None
        self._default_session_name = ""

        # Make instance to global
        overwrite_global = kw.pop("overwrite_global", True)
        if overwrite_global and options.is_global_account_overwritable:
            self.to_global(overwritable=True)

        if kw:
            raise TypeError(
                "Argument %s not acceptable, please check your spellings"
                % ", ".join(kw.keys()),
            )

    @staticmethod
    def _get_region_from_endpoint(endpoint):
        parsed = urlparse(endpoint)
        match = _ENDPOINT_HOST_WITH_REGION_REGEX.search(parsed.hostname or "")
        if match is None:
            return DEFAULT_REGION_NAME
        return match.group(1)

    def __getstate__(self):
        params = dict(
            project=self.project,
            endpoint=self.endpoint,
            tunnel_endpoint=self._tunnel_endpoint,
            logview_host=self._logview_host,
            schema=self.schema,
            seahawks_url=self._seahawks_url,
        )
        if utils.str_to_bool(os.environ.get("PYODPS_PICKLE_ACCOUNT") or "false"):
            params.update(dict(account=self.account))
        elif isinstance(self.account, accounts.AliyunAccount):
            params.update(
                dict(
                    access_id=self.account.access_id,
                    secret_access_key=self.account.secret_access_key,
                )
            )
        return params

    def __setstate__(self, state):
        if "secret_access_key" in state:
            if os.environ.get("ODPS_ENDPOINT", None) is not None:
                state["endpoint"] = os.environ["ODPS_ENDPOINT"]
            self._init(**state)
            return

        bearer_token_account = accounts.BearerTokenAccount.from_environments()
        if bearer_token_account is not None:
            state["project"] = os.environ.get("ODPS_PROJECT_NAME")
            state["endpoint"] = (
                os.environ.get("ODPS_RUNTIME_ENDPOINT") or os.environ["ODPS_ENDPOINT"]
            )
            state.pop("access_id", None)
            state.pop("secret_access_key", None)
            state["account"] = bearer_token_account
            self._init(None, None, **state)
        else:
            self._init(**state)

    def as_account(
        self,
        access_id=None,
        secret_access_key=None,
        account=None,
        app_account=None,
        namespace=None,
    ):
        """
        Creates a new ODPS entry object with a new account information

        :param access_id: Aliyun Access ID of the new account
        :param secret_access_key: Aliyun Access Key of the new account
        :param account: new account object, if `access_id` and `secret_access_key` not supplied
        :param app_account: Application account, instance of `odps.accounts.AppAccount`
            used for dual authentication
        :param namespace: namespace of the new account to be created
        :return:
        """
        if access_id is not None and secret_access_key is not None:
            assert account is None
            account = accounts.AliyunAccount(access_id, secret_access_key)

        params = dict(
            project=self.project,
            endpoint=self.endpoint,
            tunnel_endpoint=self._tunnel_endpoint,
            logview_host=self._logview_host,
            schema=self.schema,
            seahawks_url=self._seahawks_url,
            account=account or self.account,
            app_account=app_account or self.app_account,
            namespace=namespace,
            overwrite_global=False,
        )
        return ODPS(**params)

    def __mars_tokenize__(self):
        return self.__getstate__()

    @classmethod
    def _from_account(
        cls, account, project, endpoint=DEFAULT_ENDPOINT, tunnel_endpoint=None, **kwargs
    ):
        return cls(
            None,
            None,
            project,
            endpoint=endpoint,
            tunnel_endpoint=tunnel_endpoint,
            account=account,
            **kwargs
        )

    def is_schema_namespace_enabled(self, settings=None):
        settings = settings or {}
        setting = str(
            settings.get("odps.namespace.schema")
            or (options.sql.settings or {}).get("odps.namespace.schema")
            or ("true" if options.enable_schema else None)
            or self.default_tenant.get_parameter("odps.namespace.schema")
            or "false"
        )
        return setting.lower() == "true"

    @property
    def region_id(self):
        return self.region_name

    @property
    def default_tenant(self):
        return self._default_tenant

    @property
    def projects(self):
        return self._projects

    @property
    def schema(self):
        """
        Get or set default schema name of the ODPS object
        """
        default_schema = "default" if self.is_schema_namespace_enabled() else None
        return self._schema or default_schema

    @schema.setter
    def schema(self, value):
        self._schema = value
        for cb in self._property_update_callbacks:
            cb(self)

    @property
    def quota_name(self):
        return self._quota_name or options.quota_name or os.getenv("QUOTA_NAME")

    @quota_name.setter
    def quota_name(self, value):
        self._quota_name = value
        for cb in self._property_update_callbacks:
            cb(self)

    @property
    def quotas(self):
        return self._quotas

    @property
    def tunnel_endpoint(self):
        """
        Get or set tunnel endpoint of the ODPS object
        """
        return self._tunnel_endpoint

    @tunnel_endpoint.setter
    def tunnel_endpoint(self, value):
        self._tunnel_endpoint = value
        for cb in self._property_update_callbacks:
            cb(self)

    def list_projects(
        self,
        owner=None,
        user=None,
        group=None,
        prefix=None,
        max_items=None,
        region_id=None,
        tenant_id=None,
    ):
        """
        List projects.

        :param owner: Aliyun account, the owner which listed projects belong to
        :param user: name of the user who has access to listed projects
        :param group: name of the group listed projects belong to
        :param prefix: prefix of names of listed projects
        :param max_items: the maximal size of result set
        :return: projects in this endpoint.
        :rtype: generator
        """
        return self.projects.iterate(
            owner=owner,
            user=user,
            group=group,
            max_items=max_items,
            name=prefix,
            region_id=region_id,
            tenant_id=tenant_id,
        )

    @property
    def logview_host(self):
        return self._logview_host

    @property
    def job_insight_host(self):
        return self._job_insight_host

    def get_quota(self, name=None, tenant_id=None):
        """
        Get quota by name

        :param str name: quota name, if not provided, will be the name in ODPS entry
        """
        if name is None:
            name = name or self.quota_name
        if name is None:
            raise TypeError("Need to provide quota name")
        return self._quotas.get(name, tenant_id=tenant_id)

    def exist_quota(self, name):
        """
        If quota name which provided exists or not.

        :param name: quota name
        :return: True if exists or False
        :rtype: bool
        """
        return name in self._quotas

    def list_quotas(self, region_id=None):
        """
        List quotas by region id

        :param str region_id: Region ID
        :return: quotas
        """
        return self._quotas.iterate(region_id=region_id)

    def get_project(self, name=None, default_schema=None):
        """
        Get project by given name.

        :param str name: project name, if not provided, will be the default project
        :param str default_schema: default schema name, if not provided, will be
            the schema specified in ODPS object
        :return: the right project
        :rtype: :class:`odps.models.Project`
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.Project`
        """

        if name is None:
            name = self.project
        elif isinstance(name, models.Project):
            return name
        proj = self._projects[name]
        proj._tunnel_endpoint = self._tunnel_endpoint
        proj._logview_host = self._logview_host
        # use _schema to avoid requesting for tenant options
        proj._default_schema = default_schema or self._schema
        proj._quota_name = self._quota_name

        proj_ref = weakref.ref(proj)

        def project_update_callback(odps, update_schema=True):
            proj_obj = proj_ref()
            if proj_obj:
                if update_schema:
                    proj_obj._default_schema = odps.schema
                    proj_obj._quota_name = odps._quota_name
                proj_obj._tunnel_endpoint = odps.tunnel_endpoint
            else:
                self._property_update_callbacks.difference_update(
                    [project_update_callback]
                )

        # we need to update default schema value on the project
        self._property_update_callbacks.add(
            functools.partial(
                project_update_callback, update_schema=default_schema is None
            )
        )
        return proj

    def exist_project(self, name):
        """
        If project name which provided exists or not.

        :param name: project name
        :return: True if exists or False
        :rtype: bool
        """

        return name in self._projects

    def list_schemas(self, project=None, prefix=None, owner=None):
        """
        List all schemas of a project.

        :param project: project name, if not provided, will be the default project
        :param str prefix: the listed schemas start with this **prefix**
        :param str owner: Aliyun account, the owner which listed tables belong to
        :return: schemas
        """
        project = self.get_project(name=project)
        return project.schemas.iterate(name=prefix, owner=owner)

    def get_schema(self, name=None, project=None):
        """
        Get the schema by given name.

        :param name: schema name, if not provided, will be the default schema
        :param project: project name, if not provided, will be the default project
        :return: the Schema object
        """
        project = self.get_project(name=project)
        return project.schemas[name or self.schema]

    def exist_schema(self, name, project=None):
        """
        If schema name which provided exists or not.

        :param name: schema name
        :param project: project name, if not provided, will be the default project
        :return: True if exists or False
        :rtype: bool
        """
        project = self.get_project(name=project)
        return name in project.schemas

    @utils.with_wait_argument
    def create_schema(self, name, project=None, async_=False):
        """
        Create a schema with given name

        :param name: schema name
        :param project: project name, if not provided, will be the default project
        :param async_: if True, will run asynchronously
        :return: if async_ is True, return instance, otherwise return Schema object.
        """
        project = self.get_project(name=project)
        return project.schemas.create(name, async_=async_)

    @utils.with_wait_argument
    def delete_schema(self, name, project=None, async_=False):
        """
        Delete the schema with given name

        :param name: schema name
        :param project: project name, if not provided, will be the default project
        :param async_: if True, will run asynchronously
        :type async_: bool
        """
        project = self.get_project(name=project)
        return project.schemas.delete(name, async_=async_)

    def _get_project_or_schema(self, project=None, schema=None):
        if self.is_schema_namespace_enabled():
            schema = schema or "default"
        if schema is not None:
            return self.get_schema(schema, project=project)
        else:
            return self.get_project(project)

    def _split_object_dots(self, name):
        parts = [x.strip() for x in utils.split_backquoted(name, ".")]
        if len(parts) == 1:
            project, schema, name = None, None, parts[0]
        elif len(parts) == 2:
            if self.is_schema_namespace_enabled():
                schema, name = parts
                project = None
            else:
                project, name = parts
                schema = None
        else:
            project, schema, name = parts
        name = utils.strip_backquotes(name)
        return project, schema, name

    def list_tables(
        self,
        project=None,
        prefix=None,
        owner=None,
        schema=None,
        type=None,
        extended=False,
    ):
        """
        List all tables of a project.
        If prefix is provided, the listed tables will all start with this prefix.
        If owner is provided, the listed tables will belong to such owner.

        :param str project: project name, if not provided, will be the default project
        :param str prefix: the listed tables start with this **prefix**
        :param str owner: Aliyun account, the owner which listed tables belong to
        :param str schema: schema name, if not provided, will be the default schema
        :param str type: type of the table
        :param bool extended: if True, load extended information for table
        :return: tables in this project, filtered by the optional prefix and owner.
        :rtype: generator
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.tables.iterate(
            name=prefix, owner=owner, type=type, extended=extended
        )

    def get_table(self, name, project=None, schema=None):
        """
        Get table by given name.

        :param name: table name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: the right table
        :rtype: :class:`odps.models.Table`
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.Table`
        """

        if isinstance(name, six.string_types) and "." in name:
            project, schema, name = self._split_object_dots(name)

        parent = self._get_project_or_schema(project, schema)
        return parent.tables[name]

    def exist_table(self, name, project=None, schema=None):
        """
        If the table with given name exists or not.

        :param name: table name
        :param project: project name, if not provided, will be the default project
        :param schema: schema name, if not provided, will be the default schema
        :type schema: str
        :return: True if table exists or False
        :rtype: bool
        """

        if isinstance(name, six.string_types) and "." in name:
            project, schema, name = self._split_object_dots(name)

        parent = self._get_project_or_schema(project, schema)
        return name in parent.tables

    @utils.with_wait_argument
    def create_table(
        self,
        name,
        table_schema=None,
        project=None,
        schema=None,
        comment=None,
        if_not_exists=False,
        lifecycle=None,
        shard_num=None,
        hub_lifecycle=None,
        hints=None,
        transactional=False,
        primary_key=None,
        storage_tier=None,
        table_properties=None,
        async_=False,
        **kw
    ):
        """
        Create a table by given schema and other optional parameters.

        :param name: table name
        :param table_schema: table schema. Can be an instance
            of :class:`odps.models.TableSchema` or a string like 'col1 string, col2 bigint'
        :param project: project name, if not provided, will be the default project
        :param comment: table comment
        :param str schema: schema name, if not provided, will be the default schema
        :param bool if_not_exists: will not create if this table already exists, default False
        :param int lifecycle: table's lifecycle. If absent, `options.lifecycle` will be used.
        :param int shard_num: table's shard num
        :param int hub_lifecycle: hub lifecycle
        :param dict hints: hints for the task
        :param bool transactional: make table transactional
        :param list primary_key: primary key of the table, only for transactional tables
        :param str storage_tier: storage tier of the table
        :param dict table_properties: properties for table creation
        :param bool async_: if True, will run asynchronously
        :return: the created Table if not async else odps instance
        :rtype: :class:`odps.models.Table` or :class:`odps.models.Instance`

        .. seealso:: :class:`odps.models.Table`, :class:`odps.models.TableSchema`
        """
        from .types import OdpsSchema

        if table_schema is None and schema:
            if (
                isinstance(schema, OdpsSchema)
                or isinstance(schema, tuple)
                or (isinstance(schema, six.string_types) and " " in schema)
            ):
                table_schema, schema = schema, None
                warnings.warn(
                    "`schema` is renamed as `table_schema` in `create_table`, "
                    "the original parameter now represents schema name. Please "
                    "change your code.",
                    DeprecationWarning,
                    stacklevel=2,
                )
                utils.add_survey_call("ODPS.create_table(schema='schema_name')")

        if table_schema is None:
            raise TypeError("`table_schema` argument not filled")

        if isinstance(name, six.string_types) and "." in name:
            project, schema, name = self._split_object_dots(name)

        if lifecycle is None and options.lifecycle is not None:
            lifecycle = options.lifecycle

        parent = self._get_project_or_schema(project, schema)
        return parent.tables.create(
            name,
            table_schema,
            comment=comment,
            if_not_exists=if_not_exists,
            lifecycle=lifecycle,
            shard_num=shard_num,
            hub_lifecycle=hub_lifecycle,
            hints=hints,
            transactional=transactional,
            primary_key=primary_key,
            storage_tier=storage_tier,
            table_properties=table_properties,
            async_=async_,
            **kw
        )

    def _delete_table(
        self,
        name,
        project=None,
        if_exists=False,
        schema=None,
        hints=None,
        async_=False,
        table_type=None,
    ):
        if isinstance(name, six.string_types) and "." in name:
            project, schema, name = self._split_object_dots(name)

        parent = self._get_project_or_schema(project, schema)
        return parent.tables.delete(
            name, if_exists=if_exists, hints=hints, async_=async_, table_type=table_type
        )

    @utils.with_wait_argument
    def delete_table(
        self, name, project=None, if_exists=False, schema=None, hints=None, async_=False
    ):
        """
        Delete the table with given name

        :param name: table name
        :param project: project name, if not provided, will be the default project
        :param bool if_exists:  will not raise errors when the table does not exist, default False
        :param str schema: schema name, if not provided, will be the default schema
        :param dict hints: hints for the task
        :param bool async_: if True, will run asynchronously
        :return: None if not async else odps instance
        """
        return self._delete_table(
            name,
            project=project,
            if_exists=if_exists,
            schema=schema,
            hints=hints,
            async_=async_,
            table_type="managed_table",
        )

    @utils.with_wait_argument
    def delete_view(
        self, name, project=None, if_exists=False, schema=None, hints=None, async_=False
    ):
        """
        Delete the view with given name

        :param name: view name
        :param project: project name, if not provided, will be the default project
        :param bool if_exists:  will not raise errors when the view does not exist, default False
        :param str schema: schema name, if not provided, will be the default schema
        :param dict hints: hints for the task
        :param bool async_: if True, will run asynchronously
        :return: None if not async else odps instance
        """
        return self._delete_table(
            name,
            project=project,
            if_exists=if_exists,
            schema=schema,
            hints=hints,
            async_=async_,
            table_type="virtual_view",
        )

    @utils.with_wait_argument
    def delete_materialized_view(
        self, name, project=None, if_exists=False, schema=None, hints=None, async_=False
    ):
        """
        Delete the materialized view with given name

        :param name: materialized view name
        :param project: project name, if not provided, will be the default project
        :param bool if_exists:  will not raise errors when the materialized view
            does not exist, default False
        :param str schema: schema name, if not provided, will be the default schema
        :param dict hints: hints for the task
        :param bool async_: if True, will run asynchronously
        :return: None if not async else odps instance
        """
        return self._delete_table(
            name,
            project=project,
            if_exists=if_exists,
            schema=schema,
            hints=hints,
            async_=async_,
            table_type="materialized_view",
        )

    read_table = _wrap_model_func(models.TableIOMethods.read_table)
    write_table = _wrap_model_func(models.TableIOMethods.write_table)
    write_sql_result_to_table = _wrap_model_func(
        models.TableIOMethods.write_sql_result_to_table
    )

    def list_resources(self, project=None, prefix=None, owner=None, schema=None):
        """
        List all resources of a project.

        :param project: project name, if not provided, will be the default project
        :param str prefix: the listed resources start with this **prefix**
        :param str owner: Aliyun account, the owner which listed tables belong to
        :param str schema: schema name, if not provided, will be the default schema
        :return: resources
        :rtype: generator
        """

        parent = self._get_project_or_schema(project, schema)
        for resource in parent.resources.iterate(name=prefix, owner=owner):
            yield resource

    def get_resource(self, name, project=None, schema=None):
        """
        Get a resource by given name

        :param name: resource name
        :param project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: the right resource
        :rtype: :class:`odps.models.Resource`
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.Resource`
        """

        parent = self._get_project_or_schema(project, schema)
        return parent.resources[name]

    def exist_resource(self, name, project=None, schema=None):
        """
        If the resource with given name exists or not.

        :param name: resource name
        :param schema: schema name, if not provided, will be the default schema
        :type schema: str
        :param project: project name, if not provided, will be the default project
        :return: True if exists or False
        :rtype: bool
        """

        parent = self._get_project_or_schema(project, schema)
        return name in parent.resources

    def open_resource(
        self,
        name,
        project=None,
        mode="r+",
        encoding="utf-8",
        schema=None,
        type="file",
        stream=False,
        comment=None,
        temp=False,
    ):
        """
        Open a file resource as file-like object.
        This is an elegant and pythonic way to handle file resource.

        The argument ``mode`` stands for the open mode for this file resource.
        It can be binary mode if the 'b' is inside. For instance,
        'rb' means opening the resource as read binary mode
        while 'r+b' means opening the resource as read+write binary mode.
        This is most import when the file is actually binary such as tar or jpeg file,
        so be aware of opening this file as a correct mode.

        Basically, the text mode can be 'r', 'w', 'a', 'r+', 'w+', 'a+'
        just like the builtin python ``open`` method.

        * ``r`` means read only
        * ``w`` means write only, the file will be truncated when opening
        * ``a`` means append only
        * ``r+`` means read+write without constraint
        * ``w+`` will truncate first then opening into read+write
        * ``a+`` can read+write, however the written content can only be appended to the end

        :param name: file resource or file resource name
        :type name: :class:`odps.models.FileResource` or str
        :param project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param str mode: the mode of opening file, described as above
        :param str encoding: utf-8 as default
        :param str type: resource type, can be "file", "archive", "jar" or "py"
        :param bool stream: if True, use stream to upload, False by default
        :param str comment: comment of the resource
        :return: file-like object

        :Example:

        >>> with odps.open_resource('test_resource', mode='r') as fp:
        >>>     fp.read(1)  # read one unicode character
        >>>     fp.write('test')  # wrong, cannot write under read mode
        >>>
        >>> with odps.open_resource('test_resource', mode='wb') as fp:
        >>>     fp.readlines() # wrong, cannot read under write mode
        >>>     fp.write('hello world') # write bytes
        >>>
        >>> with odps.open_resource('test_resource') as fp: # default as read-write mode
        >>>     fp.seek(5)
        >>>     fp.truncate()
        >>>     fp.flush()
        """

        from .models import FileResource

        if isinstance(name, FileResource):
            return name.open(mode=mode)

        parent = self._get_project_or_schema(project, schema)
        return parent.resources.get_typed(
            name, type=type, comment=comment, temp=temp
        ).open(mode=mode, encoding=encoding, stream=stream)

    def create_resource(self, name, type=None, project=None, schema=None, **kwargs):
        """
        Create a resource by given name and given type.

        Currently, the resource type can be ``file``, ``jar``, ``py``, ``archive``, ``table``.

        The ``file``, ``jar``, ``py``, ``archive`` can be classified into file resource.
        To init the file resource, you have to provide another parameter which is a file-like object.

        For the table resource, the table name, project name, and partition should be provided
        which the partition is optional.

        :param name: resource name
        :param type: resource type, now support ``file``, ``jar``, ``py``, ``archive``, ``table``
        :param project: project name, if not provided, will be the default project
        :param schema: schema name, if not provided, will be the default schema
        :type schema: str
        :param kwargs: optional arguments, I will illustrate this in the example below.
        :return: resource depends on the type, if ``file`` will be :class:`odps.models.FileResource` and so on
        :rtype: :class:`odps.models.Resource`'s subclasses

        :Example:

        >>> from odps.models.resource import *
        >>>
        >>> res = odps.create_resource('test_file_resource', 'file', fileobj=open('/to/path/file'))
        >>> assert isinstance(res, FileResource)
        >>> True
        >>>
        >>> res = odps.create_resource('test_py_resource.py', 'py', fileobj=StringIO('import this'))
        >>> assert isinstance(res, PyResource)
        >>> True
        >>>
        >>> res = odps.create_resource('test_table_resource', 'table', table_name='test_table', partition='pt=test')
        >>> assert isinstance(res, TableResource)
        >>> True
        >>>

        .. seealso:: :class:`odps.models.FileResource`, :class:`odps.models.PyResource`,
                     :class:`odps.models.JarResource`, :class:`odps.models.ArchiveResource`,
                     :class:`odps.models.TableResource`
        """

        type_ = kwargs.get("typo") or type
        parent = self._get_project_or_schema(project, schema)
        return parent.resources.create(name=name, type=type_, **kwargs)

    def delete_resource(self, name, project=None, schema=None):
        """
        Delete resource by given name.

        :param name: resource name
        :param project: project name, if not provided, will be the default project
        :param schema: schema name, if not provided, will be the default schema
        :type schema: str
        :return: None
        """

        parent = self._get_project_or_schema(project, schema)
        return parent.resources.delete(name)

    def list_functions(self, project=None, prefix=None, owner=None, schema=None):
        """
        List all functions of a project.

        :param str project: project name, if not provided, will be the default project
        :param str prefix: the listed functions start with this **prefix**
        :param str owner: Aliyun account, the owner which listed tables belong to
        :param str schema: schema name, if not provided, will be the default schema
        :return: functions
        :rtype: generator
        """

        parent = self._get_project_or_schema(project, schema)
        for function in parent.functions.iterate(name=prefix, owner=owner):
            yield function

    def get_function(self, name, project=None, schema=None):
        """
        Get the function by given name

        :param name: function name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: the right function
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.Function`
        """

        parent = self._get_project_or_schema(project, schema)
        return parent.functions[name]

    def exist_function(self, name, project=None, schema=None):
        """
        If the function with given name exists or not.

        :param str name: function name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: True if the function exists or False
        :rtype: bool
        """

        parent = self._get_project_or_schema(project, schema)
        return name in parent.functions

    def create_function(self, name, project=None, schema=None, **kwargs):
        """
        Create a function by given name.

        :param name: function name
        :param project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param str class_type: main class
        :param list resources: the resources that function needs to use
        :return: the created function
        :rtype: :class:`odps.models.Function`

        :Example:

        >>> res = odps.get_resource('test_func.py')
        >>> func = odps.create_function('test_func', class_type='test_func.Test', resources=[res, ])

        .. seealso:: :class:`odps.models.Function`
        """

        parent = self._get_project_or_schema(project, schema)
        return parent.functions.create(name=name, **kwargs)

    def delete_function(self, name, project=None, schema=None):
        """
        Delete a function by given name.

        :param name: function name
        :param project: project name, if not provided, will be the default project
        :param schema: schema name, if not provided, will be the default schema
        :type schema: str
        :return: None
        """

        parent = self._get_project_or_schema(project, schema)
        return parent.functions.delete(name)

    def list_instances(
        self,
        project=None,
        start_time=None,
        end_time=None,
        status=None,
        only_owner=None,
        quota_index=None,
        **kw
    ):
        """
        List instances of a project by given optional conditions
        including start time, end time, status and if only the owner.

        :param project: project name, if not provided, will be the default project
        :param start_time: the start time of filtered instances
        :type start_time: datetime, int or float
        :param end_time: the end time of filtered instances
        :type end_time: datetime, int or float
        :param status: including 'Running', 'Suspended', 'Terminated'
        :param only_owner: True will filter the instances created by current user
        :type only_owner: bool
        :param quota_index:
        :type quota_index: str
        :return: instances
        :rtype: list
        """
        if "from_time" in kw:
            start_time = kw["from_time"]
            warnings.warn(
                "The keyword argument `from_time` has been replaced by `start_time`.",
                DeprecationWarning,
            )

        project = self.get_project(name=project)
        return project.instances.iterate(
            start_time=start_time,
            end_time=end_time,
            status=status,
            only_owner=only_owner,
            quota_index=quota_index,
        )

    def list_instance_queueing_infos(
        self, project=None, status=None, only_owner=None, quota_index=None
    ):
        """
        List instance queueing information.

        :param project: project name, if not provided, will be the default project
        :param status: including 'Running', 'Suspended', 'Terminated'
        :param only_owner: True will filter the instances created by current user
        :type only_owner: bool
        :param quota_index:
        :type quota_index: str
        :return: instance queueing infos
        :rtype: list
        """

        project = self.get_project(name=project)
        return project.instance_queueing_infos.iterate(
            status=status, only_owner=only_owner, quota_index=quota_index
        )

    def get_instance(self, id_, project=None, quota_name=None):
        """
        Get instance by given instance id.

        :param id_: instance id
        :param project: project name, if not provided, will be the default project
        :return: the right instance
        :rtype: :class:`odps.models.Instance`
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.Instance`
        """

        project = self.get_project(name=project)
        return project.instances.get(id_, quota_name=quota_name or self.quota_name)

    def exist_instance(self, id_, project=None):
        """
        If the instance with given id exists or not.

        :param id_: instance id
        :param project: project name, if not provided, will be the default project
        :return: True if exists or False
        :rtype: bool
        """

        project = self.get_project(name=project)
        return id_ in project.instances

    def stop_instance(self, id_, project=None):
        """
        Stop the running instance by given instance id.

        :param id_: instance id
        :param project: project name, if not provided, will be the default project
        :return: None
        """

        project = self.get_project(name=project)
        project.instances[id_].stop()

    stop_job = stop_instance  # to keep compatible

    def execute_sql(
        self,
        sql,
        project=None,
        priority=None,
        running_cluster=None,
        hints=None,
        quota_name=None,
        unique_identifier_id=None,
        **kwargs
    ):
        """
        Run a given SQL statement and block until the SQL executed successfully.

        :param str sql: SQL statement
        :param project: project name, if not provided, will be the default project
        :param int priority: instance priority, 9 as default
        :param str running_cluster: cluster to run this instance
        :param dict hints: settings for SQL, e.g. `odps.mapred.map.split.size`
        :param str quota_name: name of quota to use for SQL job
        :param str unique_identifier_id: unique instance ID
        :return: instance
        :rtype: :class:`odps.models.Instance`

        :Example:

        >>> instance = odps.execute_sql('select * from dual')
        >>> with instance.open_reader() as reader:
        >>>     for record in reader:  # iterate to handle result with schema
        >>>         # handle each record
        >>>
        >>> instance = odps.execute_sql('desc dual')
        >>> with instance.open_reader() as reader:
        >>>     print(reader.raw)  # without schema, just get the raw result

        .. seealso:: :class:`odps.models.Instance`
        """

        async_ = kwargs.pop("async_", kwargs.pop("async", False))

        inst = self.run_sql(
            sql,
            project=project,
            priority=priority,
            running_cluster=running_cluster,
            unique_identifier_id=unique_identifier_id,
            hints=hints,
            quota_name=quota_name,
            **kwargs
        )
        if not async_:
            inst.wait_for_success()
        return inst

    def run_sql(
        self,
        sql,
        project=None,
        priority=None,
        running_cluster=None,
        hints=None,
        aliases=None,
        default_schema=None,
        quota_name=None,
        unique_identifier_id=None,
        **kwargs
    ):
        """
        Run a given SQL statement asynchronously

        :param str sql: SQL statement
        :param str project: project name, if not provided, will be the default project
        :param int priority: instance priority, 9 as default
        :param str running_cluster: cluster to run this instance
        :param dict hints: settings for SQL, e.g. `odps.mapred.map.split.size`
        :param dict aliases:
        :param str quota_name: name of quota to use for SQL job
        :param str unique_identifier_id: unique instance ID
        :return: instance
        :rtype: :class:`odps.models.Instance`

        .. seealso:: :class:`odps.models.Instance`
        """
        on_instance_create = kwargs.pop("on_instance_create", None)
        sql = utils.to_text(sql)

        alter_table_match = _ALTER_TABLE_REGEX.match(sql)
        if alter_table_match:
            drop_table_name = alter_table_match.group("table_name")
            sql_project, sql_schema, sql_name = self._split_object_dots(drop_table_name)
            sql_project = sql_project or project
            sql_schema = sql_schema or default_schema
            del self._get_project_or_schema(sql_project, sql_schema).tables[sql_name]

        merge_instance = models.MergeTask.submit_alter_table_instance(
            self,
            sql,
            project=project,
            schema=default_schema,
            priority=priority,
            running_cluster=running_cluster,
            hints=hints,
            unique_identifier_id=unique_identifier_id,
            quota_name=quota_name,
            create_callback=on_instance_create,
        )
        if merge_instance is not None:
            return merge_instance

        priority = priority if priority is not None else options.priority
        if priority is None and options.get_priority is not None:
            priority = options.get_priority(self)

        task = models.SQLTask(query=sql, **kwargs)
        task.update_sql_settings(hints)

        schema_hints = {}
        default_schema = default_schema or self.schema
        if self.is_schema_namespace_enabled(hints) or default_schema is not None:
            schema_hints = {
                "odps.sql.allow.namespace.schema": "true",
                "odps.namespace.schema": "true",
            }
        if default_schema is not None:
            schema_hints["odps.default.schema"] = default_schema
        task.update_sql_settings(schema_hints)

        if quota_name or self.quota_name:
            quota_hints = {"odps.task.wlm.quota": quota_name or self.quota_name}
            task.update_sql_settings(quota_hints)

        if aliases:
            task.update_aliases(aliases)

        project = self.get_project(name=project)
        try:
            return project.instances.create(
                task=task,
                priority=priority,
                running_cluster=running_cluster,
                unique_identifier_id=unique_identifier_id,
                create_callback=on_instance_create,
            )
        except errors.ParseError as ex:
            ex.statement = sql
            raise

    def execute_sql_cost(self, sql, project=None, hints=None, **kwargs):
        """

        :param sql: SQL statement
        :type sql: str
        :param project: project name, if not provided, will be the default project
        :param hints: settings for SQL, e.g. `odps.mapred.map.split.size`
        :type hints: dict
        :return: cost info in dict format
        :rtype: cost: dict

        :Example:

        >>> sql_cost = odps.execute_sql_cost('select * from dual')
        >>> sql_cost.udf_num
        0
        >>> sql_cost.complexity
        1.0
        >>> sql_cost.input_size
        100

        """
        task = models.SQLCostTask(query=utils.to_text(sql), **kwargs)
        task.update_sql_cost_settings(hints)
        project = self.get_project(name=project)
        inst = project.instances.create(task=task)
        inst.wait_for_success()
        return inst.get_sql_task_cost()

    @staticmethod
    def _parse_partition_string(partition):
        parts = []
        for p in utils.split_quoted(partition, ","):
            kv = [pp.strip() for pp in utils.split_quoted(p, "=")]
            if len(kv) != 2:
                raise ValueError("Partition representation malformed.")
            if not kv[1].startswith('"') and not kv[1].startswith("'"):
                kv[1] = repr(kv[1])
            parts.append("%s=%s" % tuple(kv))
        return ",".join(parts)

    def list_volumes(self, project=None, schema=None, owner=None):
        """
        List volumes of a project.

        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param str owner: Aliyun account
        :return: volumes
        :rtype: list
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes.iterate(owner=owner)

    @utils.deprecated(
        "`create_volume` is deprecated. Use `created_parted_volume` instead."
    )
    def create_volume(self, name, project=None, **kwargs):
        self.create_parted_volume(name, project=project, **kwargs)

    def create_parted_volume(self, name, project=None, schema=None, **kwargs):
        """
        Create an old-fashioned partitioned volume in a project.

        :param str name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: volume
        :rtype: :class:`odps.models.PartedVolume`

        .. seealso:: :class:`odps.models.PartedVolume`
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes.create_parted(name=name, **kwargs)

    def create_fs_volume(self, name, project=None, schema=None, **kwargs):
        """
        Create a new-fashioned file system volume in a project.

        :param str name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: volume
        :rtype: :class:`odps.models.FSVolume`

        .. seealso:: :class:`odps.models.FSVolume`
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes.create_fs(name=name, **kwargs)

    def create_external_volume(
        self,
        name,
        project=None,
        schema=None,
        location=None,
        rolearn=None,
        auto_create_dir=False,
        accelerate=False,
        **kwargs
    ):
        """
        Create a file system volume based on external storage (for instance, OSS) in a project.

        :param str name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param str location: location of OSS dir, should be oss://endpoint/bucket/path
        :param str rolearn: role arn of the account hosting the OSS bucket
        :param bool auto_create_dir: if True, will create directory automatically
        :param bool accelerate: if True, will accelerate transfer of large volumes
        :return: volume
        :rtype: :class:`odps.models.FSVolume`

        .. seealso:: :class:`odps.models.FSVolume`
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes.create_external(
            name=name,
            location=location,
            rolearn=rolearn,
            auto_create_dir=auto_create_dir,
            accelerate=accelerate,
            **kwargs
        )

    def exist_volume(self, name, schema=None, project=None):
        """
        If the volume with given name exists or not.

        :param str name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: True if exists or False
        :rtype: bool
        """
        parent = self._get_project_or_schema(project, schema)
        return name in parent.volumes

    def get_volume(self, name, project=None, schema=None):
        """
        Get volume by given name.

        :param str name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: volume object. Return type depends on the type of the volume.
        :rtype: :class:`odps.models.Volume`
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes[name]

    def delete_volume(
        self, name, project=None, schema=None, auto_remove_dir=False, recursive=False
    ):
        """
        Delete volume by given name.

        :param name: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param bool auto_remove_dir: if True, directory created by external volume will be deleted
        :param bool recursive: if True, directory deletion should be recursive
        :return: None
        """
        parent = self._get_project_or_schema(project, schema)
        return parent.volumes.delete(
            name, auto_remove_dir=auto_remove_dir, recursive=recursive
        )

    def list_volume_partitions(self, volume, project=None, schema=None):
        """
        List partitions of a volume.

        :param str volume: volume name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: partitions
        :rtype: list
        """
        volume = self.get_volume(volume, project, schema=schema)
        return volume.partitions.iterate()

    def get_volume_partition(self, volume, partition=None, project=None, schema=None):
        """
        Get partition in a parted volume by given name.

        :param str volume: volume name
        :param str partition: partition name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: partitions
        :rtype: :class:`odps.models.VolumePartition`
        """
        if partition is None:
            if not volume.startswith("/") or "/" not in volume.lstrip("/"):
                raise ValueError(
                    "You should provide a partition name or use partition path instead."
                )
            volume, partition = volume.lstrip("/").split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        return volume.partitions[partition]

    def exist_volume_partition(self, volume, partition=None, project=None, schema=None):
        """
        If the volume with given name exists in a partition or not.

        :param str volume: volume name
        :param str partition: partition name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        """
        if partition is None:
            if not volume.startswith("/") or "/" not in volume.lstrip("/"):
                raise ValueError(
                    "You should provide a partition name or use partition path instead."
                )
            volume, partition = volume.lstrip("/").split("/", 1)
        try:
            volume = self.get_volume(volume, project, schema=schema)
        except errors.NoSuchObject:
            return False
        return partition in volume.partitions

    def delete_volume_partition(
        self, volume, partition=None, project=None, schema=None
    ):
        """
        Delete partition in a volume by given name

        :param str volume: volume name
        :param str partition: partition name
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        """
        if partition is None:
            if not volume.startswith("/") or "/" not in volume.lstrip("/"):
                raise ValueError(
                    "You should provide a partition name or use partition path instead."
                )
            volume, partition = volume.lstrip("/").split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        return volume.delete_partition(partition)

    def list_volume_files(self, volume, partition=None, project=None, schema=None):
        """
        List files in a volume. In partitioned volumes, the function returns files under specified partition.
        In file system volumes, the function returns files under specified path.

        :param str volume: volume name
        :param str partition: partition name for partitioned volumes, and path for file system volumes.
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :return: files
        :rtype: list

        :Example:
        >>> # List files under a partition in a partitioned volume. Two calls are equivalent.
        >>> odps.list_volume_files('parted_volume', 'partition_name')
        >>> odps.list_volume_files('/parted_volume/partition_name')
        >>> # List files under a path in a file system volume. Two calls are equivalent.
        >>> odps.list_volume_files('fs_volume', 'dir1/dir2')
        >>> odps.list_volume_files('/fs_volume/dir1/dir2')
        """
        if partition is None:
            if not volume.startswith("/"):
                raise ValueError(
                    "You should provide a partition name or use partition / path instead."
                )
            volume = volume.lstrip("/")
            if "/" in volume:
                volume, partition = volume.split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            if not partition:
                raise ValueError("Malformed partition url.")
            return volume.partitions[partition].files.iterate()
        else:
            return volume[partition].objects.iterate()

    def create_volume_directory(self, volume, path=None, project=None, schema=None):
        """
        Create a directory under a file system volume.

        :param str volume: name of the volume.
        :param str path: path of the directory to be created.
        :param str project: project name, if not provided, will be the default project.
        :param str schema: schema name, if not provided, will be the default schema
        :return: directory object.
        """
        if path is None:
            if not volume.startswith("/"):
                raise ValueError("You should provide a valid path.")
            volume = volume.lstrip("/")
            if "/" in volume:
                volume, path = volume.split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            raise ValueError("Only supported under file system volumes.")
        else:
            return volume.create_dir(path)

    def get_volume_file(self, volume, path=None, project=None, schema=None):
        """
        Get a file under a partition of a parted volume, or a file / directory object under a file system volume.

        :param str volume: name of the volume.
        :param str path: path of the directory to be created.
        :param str project: project name, if not provided, will be the default project.
        :param str schema: schema name, if not provided, will be the default schema
        :return: directory object.
        """
        if path is None:
            if not volume.startswith("/"):
                raise ValueError("You should provide a valid path.")
            volume = volume.lstrip("/")
            if "/" in volume:
                volume, path = volume.split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            if "/" not in path:
                raise ValueError("Partition/File format malformed.")
            part, file_name = path.split("/", 1)
            return volume.get_partition(part).files[file_name]
        else:
            return volume[path]

    def move_volume_file(
        self, old_path, new_path, replication=None, project=None, schema=None
    ):
        """
        Move a file / directory object under a file system volume to another location in the same volume.

        :param str old_path: old path of the volume file.
        :param str new_path: target path of the moved file.
        :param int replication: file replication.
        :param str project: project name, if not provided, will be the default project.
        :param str schema: schema name, if not provided, will be the default schema
        :return: directory object.
        """
        if not new_path.startswith("/"):
            # make relative path absolute
            old_root, _ = old_path.rsplit("/", 1)
            new_path = old_root + "/" + new_path

        if not old_path.startswith("/"):
            raise ValueError("You should provide a valid path.")
        old_volume, old_path = old_path.lstrip("/").split("/", 1)

        new_volume, _ = new_path.lstrip("/").split("/", 1)

        if old_volume != new_volume:
            raise ValueError("Moving between different volumes is not supported.")

        volume = self.get_volume(old_volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            raise ValueError("Only supported under file system volumes.")
        else:
            volume[old_path].move(new_path, replication=replication)

    def delete_volume_file(
        self, volume, path=None, recursive=False, project=None, schema=None
    ):
        """
        Delete a file / directory object under a file system volume.

        :param str volume: name of the volume.
        :param str path: path of the directory to be created.
        :param bool recursive: if True, recursively delete files
        :param str project: project name, if not provided, will be the default project.
        :param str schema: schema name, if not provided, will be the default schema
        :return: directory object.
        """
        if path is None:
            if not volume.startswith("/"):
                raise ValueError("You should provide a valid path.")
            volume = volume.lstrip("/")
            if "/" in volume:
                volume, path = volume.split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            raise ValueError("Only supported under file system volumes.")
        else:
            volume[path].delete(recursive=recursive)

    def open_volume_reader(
        self,
        volume,
        partition=None,
        file_name=None,
        project=None,
        schema=None,
        start=None,
        length=None,
        **kwargs
    ):
        """
        Open a volume file for read. A file-like object will be returned which can be used to read contents from
        volume files.

        :param str volume: name of the volume
        :param str partition: name of the partition
        :param str file_name: name of the file
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param start: start position
        :param length: length limit
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: CompressOption

        :Example:
        >>> with odps.open_volume_reader('parted_volume', 'partition', 'file') as reader:
        >>>     [print(line) for line in reader]
        """
        if partition is None:
            if not volume.startswith("/"):
                raise ValueError(
                    "You should provide a partition name or use partition / path instead."
                )
            volume = volume.lstrip("/")
            volume, partition = volume.split("/", 1)
            if "/" in partition:
                partition, file_name = partition.rsplit("/", 1)
            else:
                partition, file_name = None, partition
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            if not partition:
                raise ValueError("Malformed partition url.")
            return volume.partitions[partition].open_reader(
                file_name, start=start, length=length, **kwargs
            )
        else:
            return volume[partition].open_reader(
                file_name, start=start, length=length, **kwargs
            )

    def open_volume_writer(
        self, volume, partition=None, project=None, schema=None, **kwargs
    ):
        """
        Write data into a volume. This function behaves differently under different types of volumes.

        Under partitioned volumes, all files under a partition should be uploaded in one submission. The method
        returns a writer object with whose `open` method you can open a file inside the volume and write to it,
        or you can use `write` method to write to specific files.

        Under file system volumes, the method returns a file-like object.

        :param str volume: name of the volume
        :param str partition: partition name for partitioned volumes, and path for file system volumes.
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`

        :Example:
        >>> # Writing to partitioned volumes
        >>> with odps.open_volume_writer('parted_volume', 'partition') as writer:
        >>>     # both write methods are acceptable
        >>>     writer.open('file1').write('some content')
        >>>     writer.write('file2', 'some content')
        >>> # Writing to file system volumes
        >>> with odps.open_volume_writer('/fs_volume/dir1/file_name') as writer:
        >>>     writer.write('some content')
        """
        if partition is None:
            if not volume.startswith("/"):
                raise ValueError(
                    "You should provide a partition name or use partition / path instead."
                )
            volume = volume.lstrip("/")
            volume, partition = volume.split("/", 1)
        volume = self.get_volume(volume, project, schema=schema)
        if isinstance(volume, models.PartedVolume):
            return volume.partitions[partition].open_writer(**kwargs)
        else:
            if "/" in partition:
                partition, file_name = partition.rsplit("/", 1)
            else:
                partition, file_name = None, partition
            return volume[partition].open_writer(file_name, **kwargs)

    def list_xflows(self, project=None, owner=None):
        """
        List xflows of a project which can be filtered by the xflow owner.

        :param str project: project name, if not provided, will be the default project
        :param str owner: Aliyun account
        :return: xflows
        :rtype: list
        """

        project = self.get_project(name=project)
        return project.xflows.iterate(owner=owner)

    def get_xflow(self, name, project=None):
        """
        Get xflow by given name

        :param name: xflow name
        :param project: project name, if not provided, will be the default project
        :return: xflow
        :rtype: :class:`odps.models.XFlow`
        :raise: :class:`odps.errors.NoSuchObject` if not exists

        .. seealso:: :class:`odps.models.XFlow`
        """

        project = self.get_project(name=project)
        return project.xflows[name]

    def exist_xflow(self, name, project=None):
        """
        If the xflow with given name exists or not.

        :param name: xflow name
        :param project: project name, if not provided, will be the default project
        :return: True if exists or False
        :rtype: bool
        """

        project = self.get_project(name=project)
        return name in project.xflows

    def run_xflow(
        self,
        xflow_name,
        xflow_project=None,
        parameters=None,
        project=None,
        hints=None,
        priority=None,
    ):
        """
        Run xflow by given name, xflow project, paremeters asynchronously.

        :param xflow_name: XFlow name
        :type xflow_name: str
        :param xflow_project: the project XFlow deploys
        :type xflow_project: str
        :param parameters: parameters
        :type parameters: dict
        :param project: project name, if not provided, will be the default project
        :param hints: execution hints
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :return: instance
        :rtype: :class:`odps.models.Instance`

        .. seealso:: :class:`odps.models.Instance`
        """
        project = self.get_project(name=project)
        xflow_project = xflow_project or project
        if isinstance(xflow_project, models.Project):
            xflow_project = xflow_project.name
        return project.xflows.run_xflow(
            xflow_name=xflow_name,
            xflow_project=xflow_project,
            project=project,
            parameters=parameters,
            hints=hints,
            priority=priority,
        )

    def execute_xflow(
        self,
        xflow_name,
        xflow_project=None,
        parameters=None,
        project=None,
        hints=None,
        priority=None,
    ):
        """
        Run xflow by given name, xflow project, paremeters, block until xflow executed successfully.

        :param xflow_name: XFlow name
        :type xflow_name: str
        :param xflow_project: the project XFlow deploys
        :type xflow_project: str
        :param parameters: parameters
        :type parameters: dict
        :param project: project name, if not provided, will be the default project
        :param hints: execution hints
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :return: instance
        :rtype: :class:`odps.models.Instance`

        .. seealso:: :class:`odps.models.Instance`
        """

        inst = self.run_xflow(
            xflow_name,
            xflow_project=xflow_project,
            parameters=parameters,
            project=project,
            hints=hints,
            priority=priority,
        )
        inst.wait_for_success()
        return inst

    def get_xflow_results(self, instance, project=None):
        """
        The result given the results of xflow

        :param instance: instance of xflow
        :type instance: :class:`odps.models.Instance`
        :param project: project name, if not provided, will be the default project
        :return: xflow result
        :rtype: dict
        """

        project = self.get_project(name=project)

        from .models import Instance

        if not isinstance(instance, Instance):
            instance = project.instances[instance]

        return project.xflows.get_xflow_results(instance)

    def get_xflow_sub_instances(self, instance, project=None):
        """
        The result iterates the sub instance of xflow

        :param instance: instance of xflow
        :type instance: :class:`odps.models.Instance`
        :param project: project name, if not provided, will be the default project
        :return: sub instances dictionary
        """
        project = self.get_project(name=project)
        return project.xflows.get_xflow_sub_instances(instance)

    def iter_xflow_sub_instances(self, instance, interval=1, project=None, check=False):
        """
        The result iterates the sub instance of xflow and will wait till instance finish

        :param instance: instance of xflow
        :type instance: :class:`odps.models.Instance`
        :param interval: time interval to check
        :param project: project name, if not provided, will be the default project
        :param bool check: check if the instance is successful
        :return: generator of sub-instances
        """
        project = self.get_project(name=project)
        return project.xflows.iter_xflow_sub_instances(
            instance, interval=interval, check=check
        )

    def delete_xflow(self, name, project=None):
        """
        Delete xflow by given name.

        :param name: xflow name
        :param project: project name, if not provided, will be the default project
        :return: None
        """

        project = self.get_project(name=project)

        return project.xflows.delete(name)

    def list_offline_models(self, project=None, prefix=None, owner=None):
        """
        List offline models of project by optional filter conditions including prefix and owner.

        :param project: project name, if not provided, will be the default project
        :param prefix: prefix of offline model's name
        :param owner: Aliyun account
        :return: offline models
        :rtype: list
        """

        project = self.get_project(name=project)
        return project.offline_models.iterate(name=prefix, owner=owner)

    def get_offline_model(self, name, project=None):
        """
        Get offline model by given name

        :param name: offline model name
        :param project: project name, if not provided, will be the default project
        :return: offline model
        :rtype: :class:`odps.models.ml.OfflineModel`
        :raise: :class:`odps.errors.NoSuchObject` if not exists
        """

        project = self.get_project(name=project)
        return project.offline_models[name]

    def exist_offline_model(self, name, project=None):
        """
        If the offline model with given name exists or not.

        :param name: offline model's name
        :param project: project name, if not provided, will be the default project
        :return: True if offline model exists else False
        :rtype: bool
        """

        project = self.get_project(name=project)
        return name in project.offline_models

    @utils.with_wait_argument
    def copy_offline_model(
        self, name, new_name, project=None, new_project=None, async_=False
    ):
        """
        Copy current model into a new location.

        :param new_name: name of the new model
        :param new_project: new project name. if absent, original project name will be used
        :param async_: if True, return the copy instance. otherwise return the newly-copied model
        """
        return self.get_offline_model(name, project=project).copy(
            new_name, new_project=new_project, async_=async_
        )

    def delete_offline_model(self, name, project=None, if_exists=False):
        """
        Delete the offline model by given name.

        :param name: offline model's name
        :param if_exists:  will not raise errors when the offline model does not exist, default False
        :param project: project name, if not provided, will be the default project
        :return: None
        """

        project = self.get_project(name=project)
        try:
            return project.offline_models.delete(name)
        except errors.NoSuchObject:
            if not if_exists:
                raise

    def get_logview_host(self):
        """
        Get logview host address.
        :return: logview host address
        """
        if self.endpoint in _logview_host_cache:
            return _logview_host_cache[self.endpoint]

        try:
            logview_host = utils.to_str(
                self.rest.get(self.endpoint + "/logview/host").content
            )
        except:
            logview_host = None
        if not logview_host:
            logview_host = utils.get_default_logview_endpoint(
                LOGVIEW_HOST_DEFAULT, self.endpoint
            )
        _logview_host_cache[self.endpoint] = logview_host
        return logview_host

    def get_logview_address(
        self, instance_id, hours=None, project=None, use_legacy=None
    ):
        """
        Get logview address by given instance id and hours.

        :param instance_id: instance id
        :param hours:
        :param project: project name, if not provided, will be the default project
        :return: logview address
        :rtype: str
        """
        hours = hours or options.logview_hours
        inst = self.get_instance(instance_id, project=project)
        return inst.get_logview_address(hours=hours, use_legacy=use_legacy)

    def get_project_policy(self, project=None):
        """
        Get policy of a project

        :param project: project name, if not provided, will be the default project
        :return: JSON object
        """
        project = self.get_project(name=project)
        return project.policy

    def set_project_policy(self, policy, project=None):
        """
        Set policy of a project

        :param policy: name of policy.
        :param project: project name, if not provided, will be the default project
        :return: JSON object
        """
        project = self.get_project(name=project)
        project.policy = policy

    def create_role(self, name, project=None):
        """
        Create a role in a project

        :param name: name of the role to create
        :param project: project name, if not provided, will be the default project
        :return: role object created
        """
        project = self.get_project(name=project)
        return project.roles.create(name)

    def list_roles(self, project=None):
        """
        List all roles in a project

        :param project: project name, if not provided, will be the default project
        :return: collection of role objects
        """
        project = self.get_project(name=project)
        return project.roles

    def exist_role(self, name, project=None):
        """
        Check if a role exists in a project

        :param name: name of the role
        :param project: project name, if not provided, will be the default project
        """
        project = self.get_project(name=project)
        return name in project.roles

    def delete_role(self, name, project=None):
        """
        Delete a role in a project

        :param name: name of the role to delete
        :param project: project name, if not provided, will be the default project
        """
        project = self.get_project(name=project)
        project.roles.delete(name)

    def get_role_policy(self, name, project=None):
        """
        Get policy object of a role

        :param name: name of the role
        :param project: project name, if not provided, will be the default project
        :return: JSON object
        """
        project = self.get_project(name=project)
        return project.roles[name].policy

    def set_role_policy(self, name, policy, project=None):
        """
        Get policy object of project

        :param name: name of the role
        :param policy: policy string or JSON object
        :param project: project name, if not provided, will be the default project
        """
        project = self.get_project(name=project)
        project.roles[name].policy = policy

    def list_role_users(self, name, project=None):
        """
        List users who have the specified role.

        :param name: name of the role
        :param project: project name, if not provided, will be the default project
        :return: collection of User objects
        """
        project = self.get_project(name=project)
        return project.roles[name].users

    def create_user(self, name, project=None):
        """
        Add a user into the project

        :param name: user name
        :param project: project name, if not provided, will be the default project
        :return: user created
        """
        project = self.get_project(name=project)
        return project.users.create(name)

    def list_users(self, project=None):
        """
        List users in the project

        :param project: project name, if not provided, will be the default project
        :return: collection of User objects
        """
        project = self.get_project(name=project)
        return project.users

    def exist_user(self, name, project=None):
        """
        Check if a user exists in the project

        :param name: user name
        :param project: project name, if not provided, will be the default project
        """
        project = self.get_project(name=project)
        return name in project.users

    def delete_user(self, name, project=None):
        """
        Delete a user from the project

        :param name: user name
        :param project: project name, if not provided, will be the default project
        """
        project = self.get_project(name=project)
        project.users.delete(name)

    def list_user_roles(self, name, project=None):
        """
        List roles of the specified user

        :param name: user name
        :param project: project name, if not provided, will be the default project
        :return: collection of Role object
        """
        project = self.get_project(name=project)
        return project.users[name].roles

    def get_security_options(self, project=None):
        """
        Get all security options of a project

        :param project: project name, if not provided, will be the default project
        :return: SecurityConfiguration object
        """
        project = self.get_project(name=project)
        return project.security_options

    def get_security_option(self, option_name, project=None):
        """
        Get one security option of a project

        :param option_name: name of the security option. Please refer to ODPS options for more details.
        :param project: project name, if not provided, will be the default project
        :return: option value
        """
        option_name = utils.camel_to_underline(option_name)
        sec_options = self.get_security_options(project=project)
        if not hasattr(sec_options, option_name):
            raise ValueError("Option does not exists.")
        return getattr(sec_options, option_name)

    def set_security_option(self, option_name, value, project=None):
        """
        Set a security option of a project

        :param option_name: name of the security option. Please refer to ODPS options for more details.
        :param value: value of security option to be set.
        :param project: project name, if not provided, will be the default project.
        """
        option_name = utils.camel_to_underline(option_name)
        sec_options = self.get_security_options(project=project)
        if not hasattr(sec_options, option_name):
            raise ValueError("Option does not exists.")
        setattr(sec_options, option_name, value)
        sec_options.update()

    def run_security_query(
        self, query, project=None, schema=None, token=None, hints=None, output_json=True
    ):
        """
        Run a security query to grant / revoke / query privileges. If the query is `install package`
        or `uninstall package`, return a waitable AuthQueryInstance object, otherwise returns
        the result string or json value.

        :param str query: query text
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param bool output_json: parse json for the output
        :return: result string / json object
        """
        project = self.get_project(name=project)
        schema = schema or self.schema
        return project.run_security_query(
            query, schema=schema, token=token, hints=hints, output_json=output_json
        )

    def execute_security_query(
        self, query, project=None, schema=None, token=None, hints=None, output_json=True
    ):
        """
        Execute a security query to grant / revoke / query privileges and returns
        the result string or json value.

        :param str query: query text
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param bool output_json: parse json for the output
        :return: result string / json object
        """
        from .models import Project

        instance_or_result = self.run_security_query(
            query,
            project=project,
            schema=schema,
            token=token,
            hints=hints,
            output_json=output_json,
        )
        if not isinstance(instance_or_result, Project.AuthQueryInstance):
            return instance_or_result
        return instance_or_result.wait_for_success()

    @classmethod
    def _build_account(cls, access_id, secret_access_key):
        return accounts.AliyunAccount(access_id, secret_access_key)

    def to_global(self, overwritable=False):
        options.is_global_account_overwritable = overwritable
        options.account = self.account
        options.default_project = self.project
        # use _schema to avoid requesting for tenant options
        options.default_schema = self._schema
        options.endpoint = self.endpoint
        options.logview_host = self.logview_host
        options.tunnel.endpoint = self._tunnel_endpoint
        options.app_account = self.app_account
        options.region_name = self.region_name
        options.default_namespace = self.namespace

    @classmethod
    def from_global(cls):
        if options.account is not None and options.default_project is not None:
            return cls._from_account(
                options.account,
                options.default_project,
                endpoint=options.endpoint,
                schema=options.default_schema,
                tunnel_endpoint=options.tunnel.endpoint,
                logview_host=options.logview_host,
                app_account=options.app_account,
                region_name=options.region_name,
                namespace=options.default_namespace,
            )
        else:
            return None

    @classmethod
    def from_environments(cls):
        try:
            project = os.getenv("ODPS_PROJECT_NAME")
            endpoint = os.environ["ODPS_ENDPOINT"]
            tunnel_endpoint = os.getenv("ODPS_TUNNEL_ENDPOINT")
            namespace = os.getenv("ODPS_NAMESPACE")
            return cls(
                None,
                None,
                account=accounts.from_environments(),
                project=project,
                endpoint=endpoint,
                tunnel_endpoint=tunnel_endpoint,
                namespace=namespace,
            )
        except KeyError:
            return None

    _attach_mcqa_session = _wrap_model_func(models.SessionMethods._attach_mcqa_session)
    attach_session = _wrap_model_func(models.SessionMethods.attach_session)
    _create_mcqa_session = _wrap_model_func(models.SessionMethods._create_mcqa_session)
    create_session = _wrap_model_func(models.SessionMethods.create_session)
    default_session = _wrap_model_func(models.SessionMethods.default_session)
    _get_default_mcqa_session = _wrap_model_func(
        models.SessionMethods._get_default_mcqa_session
    )
    run_sql_interactive_with_fallback = _wrap_model_func(
        models.SessionMethods.run_sql_interactive_with_fallback
    )
    run_sql_interactive = _wrap_model_func(models.SessionMethods.run_sql_interactive)
    execute_sql_interactive = _wrap_model_func(
        models.SessionMethods.execute_sql_interactive
    )

    run_merge_files = _wrap_model_func(models.MergeTask.run_merge_files)
    execute_merge_files = _wrap_model_func(models.MergeTask.execute_merge_files)
    run_archive_table = _wrap_model_func(models.MergeTask.run_archive_table)
    execute_archive_table = _wrap_model_func(models.MergeTask.execute_archive_table)
    run_freeze_command = _wrap_model_func(models.MergeTask.run_freeze_command)
    execute_freeze_command = _wrap_model_func(models.MergeTask.execute_freeze_command)


def _get_odps_from_model(self):
    cur = self
    while cur is not None and not isinstance(cur, models.Project):
        cur = cur.parent
    return cur.odps if cur else None


models.RestModel.odps = property(fget=_get_odps_from_model)
del _get_odps_from_model

try:
    from .internal.core import *  # noqa: F401
except ImportError:  # pragma: no cover
    pass

try:
    from . import mars_extension

    for _mars_attr in (
        "create_mars_cluster",
        "persist_mars_dataframe",
        "to_mars_dataframe",
        "run_script_in_mars",
        "run_mars_job",
        "list_mars_instances",
        "sql_to_mars_dataframe",
    ):
        setattr(ODPS, _mars_attr, getattr(mars_extension, _mars_attr))
except ImportError:
    pass


DEFAULT_ENDPOINT = os.getenv(
    "ODPS_ENDPOINT", os.getenv("PYODPS_ENDPOINT", DEFAULT_ENDPOINT)
)
del _wrap_model_func
