#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import base64
import functools
import json
import logging
import sys
import threading
import time
import warnings
from collections import OrderedDict
from datetime import datetime

import requests

from .. import compat, errors, options, readers, serializers, utils
from ..accounts import BearerTokenAccount
from ..compat import Enum, six
from ..lib.monotonic import monotonic
from ..lib.tblib import pickling_support
from ..utils import to_str
from .core import JSONRemoteModel, LazyLoad, XMLRemoteModel
from .job import Job
from .readers import TunnelArrowReader, TunnelRecordReader
from .tasks import SQLTask
from .worker import LOG_TYPES_MAPPING, WorkerDetail2

logger = logging.getLogger(__name__)
pickling_support.install()


_RESULT_LIMIT_HELPER_MSG = (
    "See https://pyodps.readthedocs.io/zh_CN/latest/base-sql.html#read-sql-exec-result "
    "for more information about limits on instance results."
)
_STATUS_QUERY_TIMEOUT = 5 * 60  # timeout when getting status


def _with_status_api_lock(func):
    @six.wraps(func)
    def wrapped(self, *args, **kw):
        with self._status_api_lock:
            return func(self, *args, **kw)

    return wrapped


class SpawnedInstanceReaderMixin(object):
    @property
    def schema(self):
        return self._schema

    @staticmethod
    def _read_instance_split(
        conn,
        download_id,
        start,
        count,
        idx,
        rest_client=None,
        project=None,
        instance_id=None,
        tunnel_endpoint=None,
        columns=None,
        arrow=False,
    ):
        # read part data
        from ..tunnel import InstanceTunnel

        try:
            instance_tunnel = InstanceTunnel(
                client=rest_client, project=project, endpoint=tunnel_endpoint
            )
            session = utils.call_with_retry(
                instance_tunnel.create_download_session,
                instance=instance_id,
                download_id=download_id,
            )

            def _data_to_pandas():
                if not arrow:
                    with session.open_record_reader(
                        start, count, columns=columns
                    ) as reader:
                        return reader.to_pandas()
                else:
                    with session.open_arrow_reader(
                        start, count, columns=columns
                    ) as reader:
                        return reader.to_pandas()

            data = utils.call_with_retry(_data_to_pandas)
            conn.send((idx, data, True))
        except:
            try:
                conn.send((idx, sys.exc_info(), False))
            except:
                logger.exception("Failed to write in process %d", idx)
                raise

    def _get_process_split_reader(self, columns=None, append_partitions=None):  # noqa
        rest_client = self._parent._client
        project = self._parent.project.name
        tunnel_endpoint = self._parent.project._tunnel_endpoint
        instance_id = self._parent.id

        return functools.partial(
            self._read_instance_split,
            rest_client=rest_client,
            project=project,
            instance_id=instance_id,
            arrow=isinstance(self, TunnelArrowReader),
            tunnel_endpoint=tunnel_endpoint,
            columns=columns or self._column_names,
        )


class InstanceRecordReader(SpawnedInstanceReaderMixin, TunnelRecordReader):
    def __init__(self, instance, download_session, columns=None):
        super(InstanceRecordReader, self).__init__(
            instance, download_session, columns=columns
        )
        self._schema = download_session.schema


class InstanceArrowReader(SpawnedInstanceReaderMixin, TunnelArrowReader):
    def __init__(self, instance, download_session, columns=None):
        super(InstanceArrowReader, self).__init__(
            instance, download_session, columns=columns
        )
        self._schema = download_session.schema


class Instance(LazyLoad):
    """
    Instance means that a ODPS task will sometimes run as an instance.

    ``status`` can reflect the current situation of a instance.
    ``is_terminated`` method indicates if the instance has finished.
    ``is_successful`` method indicates if the instance runs successfully.
    ``wait_for_success`` method will block the main process until the instance has finished.

    For a SQL instance, we can use open_reader to read the results.

    :Example:

    >>> instance = odps.execute_sql('select * from dual')  # this sql return the structured data
    >>> with instance.open_reader() as reader:
    >>>     # handle the record
    >>>
    >>> instance = odps.execute_sql('desc dual')  # this sql do not return structured data
    >>> with instance.open_reader() as reader:
    >>>    print(reader.raw)  # just return the raw result
    """

    __slots__ = (
        "_task_results",
        "_is_sync",
        "_id_thread_local",
        "_status_api_lock",
        "_logview_address",
        "_logview_address_time",
        "_last_progress_value",
        "_last_progress_time",
        "_logview_logged",
        "_job_source",
    )

    _download_id = utils.thread_local_attribute("_id_thread_local", lambda: None)

    def __init__(self, **kwargs):
        if "task_results" in kwargs:
            kwargs["_task_results"] = kwargs.pop("task_results")
        super(Instance, self).__init__(**kwargs)

        try:
            del self._id_thread_local
        except AttributeError:
            pass

        if self._task_results is not None and len(self._task_results) > 0:
            self._is_sync = True
            self._status = Instance.Status.TERMINATED
        else:
            self._is_sync = False

        self._status_api_lock = threading.RLock()

        self._logview_address = None
        self._logview_address_time = None
        self._last_progress_value = None
        self._last_progress_time = None
        self._logview_logged = False
        self._job_source = None

    @property
    def id(self):
        return self.name

    class Status(Enum):
        RUNNING = "Running"
        SUSPENDED = "Suspended"
        TERMINATED = "Terminated"

    class InstanceStatus(XMLRemoteModel):
        _root = "Instance"

        status = serializers.XMLNodeField("Status")

    class InstanceResult(XMLRemoteModel):
        class TaskResult(XMLRemoteModel):
            class Result(XMLRemoteModel):
                transform = serializers.XMLNodeAttributeField(attr="Transform")
                format = serializers.XMLNodeAttributeField(attr="Format")
                text = serializers.XMLNodeField(".", default="")

                def __str__(self):
                    if six.PY2:
                        text = utils.to_binary(self.text)
                    else:
                        text = self.text
                    if self.transform is not None and self.transform == "Base64":
                        try:
                            return utils.to_str(base64.b64decode(text))
                        except TypeError:
                            return text
                    return text

                def __bytes__(self):
                    text = utils.to_binary(self.text)
                    if self.transform is not None and self.transform == "Base64":
                        try:
                            return utils.to_binary(base64.b64decode(text))
                        except TypeError:
                            return text
                    return text

            type = serializers.XMLNodeAttributeField(attr="Type")
            name = serializers.XMLNodeField("Name")
            result = serializers.XMLNodeReferenceField(Result, "Result")

        task_results = serializers.XMLNodesReferencesField(TaskResult, "Tasks", "Task")

    class Task(XMLRemoteModel):
        """
        Task stands for each task inside an instance.

        It has a name, a task type, the start to end time, and a running status.
        """

        name = serializers.XMLNodeField("Name")
        type = serializers.XMLNodeAttributeField(attr="Type")
        start_time = serializers.XMLNodeField(
            "StartTime", parse_callback=utils.parse_rfc822
        )
        end_time = serializers.XMLNodeField(
            "EndTime", parse_callback=utils.parse_rfc822
        )
        status = serializers.XMLNodeField(
            "Status", parse_callback=lambda s: Instance.Task.TaskStatus(s.upper())
        )
        histories = serializers.XMLNodesReferencesField(
            "Instance.Task", "Histories", "History"
        )

        class TaskStatus(Enum):
            WAITING = "WAITING"
            RUNNING = "RUNNING"
            SUCCESS = "SUCCESS"
            FAILED = "FAILED"
            SUSPENDED = "SUSPENDED"
            CANCELLED = "CANCELLED"

        class TaskProgress(XMLRemoteModel):
            """
            TaskProgress reprents for the progress of a task.

            A single TaskProgress may consist of several stages.

            :Example:

            >>> progress = instance.get_task_progress('task_name')
            >>> progress.get_stage_progress_formatted_string()
            2015-11-19 16:39:07 M1_Stg1_job0:0/0/1[0%]	R2_1_Stg1_job0:0/0/1[0%]
            """

            class StageProgress(XMLRemoteModel):
                name = serializers.XMLNodeAttributeField(attr="ID")
                backup_workers = serializers.XMLNodeField(
                    "BackupWorkers", parse_callback=int
                )
                terminated_workers = serializers.XMLNodeField(
                    "TerminatedWorkers", parse_callback=int
                )
                running_workers = serializers.XMLNodeField(
                    "RunningWorkers", parse_callback=int
                )
                total_workers = serializers.XMLNodeField(
                    "TotalWorkers", parse_callback=int
                )
                input_records = serializers.XMLNodeField(
                    "InputRecords", parse_callback=int
                )
                output_records = serializers.XMLNodeField(
                    "OutputRecords", parse_callback=int
                )
                finished_percentage = serializers.XMLNodeField(
                    "FinishedPercentage", parse_callback=int
                )

            stages = serializers.XMLNodesReferencesField(StageProgress, "Stage")

            def get_stage_progress_formatted_string(self):
                buf = six.StringIO()

                buf.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
                buf.write(" ")

                for stage in self.stages:
                    buf.write(
                        "{0}:{1}/{2}/{3}{4}[{5}%]\t".format(
                            stage.name,
                            stage.running_workers,
                            stage.terminated_workers,
                            stage.total_workers,
                            "(+%s backups)" % stage.backup_workers
                            if stage.backup_workers > 0
                            else "",
                            stage.finished_percentage,
                        )
                    )

                return buf.getvalue()

    class TaskInfo(serializers.XMLSerializableModel):
        _root = "Instance"
        __slots__ = "key", "value"

        key = serializers.XMLNodeField("Key")
        value = serializers.XMLNodeField("Value")

    class TaskCost(object):
        __slots__ = "cpu_cost", "memory_cost", "input_size"

        def __init__(self, cpu_cost=None, memory_cost=None, input_size=None):
            self.cpu_cost = cpu_cost
            self.memory_cost = memory_cost
            self.input_size = input_size

    class SQLCost(object):
        __slots__ = "udf_num", "complexity", "input_size"

        def __init__(self, udf_num=None, complexity=None, input_size=None):
            self.udf_num = udf_num
            self.complexity = complexity
            self.input_size = input_size

    class DownloadSessionCreationError(errors.InternalServerError):
        pass

    class TaskSummary(dict):
        def __init__(self, *args, **kwargs):
            super(Instance.TaskSummary, self).__init__(*args, **kwargs)
            self.summary_text, self.json_summary = None, None

    class AnonymousSubmitInstance(XMLRemoteModel):
        _root = "Instance"
        job = serializers.XMLNodeReferenceField(Job, "Job")

    class InstanceQueueingInfo(JSONRemoteModel):
        __slots__ = ("_instance",)

        class Status(Enum):
            RUNNING = "Running"
            SUSPENDED = "Suspended"
            TERMINATED = "Terminated"
            UNKNOWN = "Unknown"

        _properties = serializers.JSONRawField()  # hold the raw dict
        instance_id = serializers.JSONNodeField("instanceId")
        priority = serializers.JSONNodeField("instancePriority")
        progress = serializers.JSONNodeField("instanceProcess")
        job_name = serializers.JSONNodeField("jobName")
        project = serializers.JSONNodeField("projectName")
        skynet_id = serializers.JSONNodeField("skynetId")
        start_time = serializers.JSONNodeField(
            "startTime", parse_callback=utils.strptime_with_tz
        )
        task_type = serializers.JSONNodeField("taskType")
        task_name = serializers.JSONNodeField("taskName")
        user_account = serializers.JSONNodeField("userAccount")
        status = serializers.JSONNodeField("status", parse_callback=Status)

        @property
        def instance(self):
            if hasattr(self, "_instance") and self._instance:
                return self._instance

            from .projects import Projects

            self._instance = Projects(client=self._client)[self.project].instances[
                self.instance_id
            ]
            return self._instance

        def __getattr__(self, item):
            item = utils.underline_to_camel(item)

            if item in self._properties:
                return self._properties[item]

            return super(Instance.InstanceQueueingInfo, self).__getattr__(item)

    name = serializers.XMLNodeField("Name")
    owner = serializers.XMLNodeField("Owner")
    start_time = serializers.XMLNodeField(
        "StartTime", parse_callback=utils.parse_rfc822
    )
    end_time = serializers.XMLNodeField("EndTime", parse_callback=utils.parse_rfc822)
    _status = serializers.XMLNodeField(
        "Status", parse_callback=lambda s: Instance.Status(s)
    )
    _tasks = serializers.XMLNodesReferencesField(Task, "Tasks", "Task")

    def reload(self, blocking=False):
        actions = []
        if blocking:
            actions.append("instancestatus")
        resp = self._client.get(self.resource(), actions=actions)

        self.owner = resp.headers.get("x-odps-owner")
        self.start_time = utils.parse_rfc822(resp.headers.get("x-odps-start-time"))
        end_time_header = "x-odps-end-time"
        if (
            end_time_header in resp.headers
            and len(resp.headers[end_time_header].strip()) > 0
        ):
            self.end_time = utils.parse_rfc822(resp.headers.get(end_time_header))

        self.parse(self._client, resp, obj=self)
        # remember not to set `_loaded = True`

    def stop(self):
        """
        Stop this instance.

        :return: None
        """

        instance_status = Instance.InstanceStatus(status="Terminated")
        xml_content = instance_status.serialize()

        headers = {"Content-Type": "application/xml"}
        self._client.put(self.resource(), xml_content, headers=headers)

    @staticmethod
    def _call_with_retry(func, retry=False, retry_timeout=None):
        retry_kw = {
            "retry_times": options.retry_times if retry else 0,
            "exc_type": (errors.InternalServerError, errors.RequestTimeTooSkewed),
        }
        if retry and retry_timeout is not None:
            # use retry timeout instead of retry count
            retry_kw.update({"retry_times": None, "retry_timeout": retry_timeout})
        return utils.call_with_retry(func, **retry_kw)

    @_with_status_api_lock
    def get_task_results_without_format(self, timeout=None, retry=True):
        if self._is_sync:
            return self._task_results

        def _get_resp():
            return self._client.get(self.resource(), action="result", timeout=timeout)

        resp = self._call_with_retry(_get_resp, retry=retry, retry_timeout=timeout)
        instance_result = Instance.InstanceResult.parse(self._client, resp)
        return OrderedDict([(r.name, r.result) for r in instance_result.task_results])

    @_with_status_api_lock
    def get_task_results(self, timeout=None, retry=True):
        """
        Get all the task results.

        :return: a dict which key is task name, and value is the task result as string
        :rtype: dict
        """

        results = self.get_task_results_without_format(timeout=timeout, retry=retry)
        if options.tunnel.string_as_binary:
            return OrderedDict(
                [(k, bytes(result)) for k, result in six.iteritems(results)]
            )
        else:
            return OrderedDict(
                [(k, str(result)) for k, result in six.iteritems(results)]
            )

    def _get_default_task_name(self):
        job = self._get_job()
        if len(job.tasks) != 1:
            msg = "No tasks" if len(job.tasks) == 0 else "Multiple tasks"
            raise errors.ODPSError("%s in instance." % msg)
        return job.tasks[0].name

    @_with_status_api_lock
    def get_task_result(self, task_name=None, timeout=None, retry=True):
        """
        Get a single task result.

        :param task_name: task name
        :return: task result
        :rtype: str
        """
        task_name = task_name or self._get_default_task_name()
        return self.get_task_results(timeout=timeout, retry=retry).get(task_name)

    @_with_status_api_lock
    def get_task_summary(self, task_name=None):
        """
        Get a task's summary, mostly used for MapReduce.

        :param task_name: task name
        :return: summary as a dict parsed from JSON
        :rtype: dict
        """
        task_name = task_name or self._get_default_task_name()
        params = {"taskname": task_name}
        resp = self._client.get(
            self.resource(), action="instancesummary", params=params
        )

        map_reduce = resp.json().get("Instance")
        if map_reduce:
            json_summary = map_reduce.get("JsonSummary")
            if json_summary:
                summary = Instance.TaskSummary(json.loads(json_summary))
                summary.summary_text = map_reduce.get("Summary")
                summary.json_summary = json_summary

                return summary

    @_with_status_api_lock
    def get_task_statuses(self, retry=True, timeout=None):
        """
        Get all tasks' statuses

        :return: a dict which key is the task name and value is the :class:`odps.models.Instance.Task` object
        :rtype: dict
        """

        def _get_resp():
            return self._client.get(self.resource(), action="taskstatus")

        resp = self._call_with_retry(_get_resp, retry=retry, retry_timeout=timeout)
        self.parse(self._client, resp, obj=self)
        return dict([(task.name, task) for task in self._tasks])

    @_with_status_api_lock
    def get_task_names(self, retry=True, timeout=None):
        """
        Get names of all tasks

        :return: task names
        :rtype: list
        """

        return compat.lkeys(self.get_task_statuses(retry=retry, timeout=timeout))

    def get_task_cost(self, task_name=None):
        """
        Get task cost

        :param task_name: name of the task
        :return: task cost
        :rtype: Instance.TaskCost

        :Example:

        >>> cost = instance.get_task_cost(instance.get_task_names()[0])
        >>> cost.cpu_cost
        200
        >>> cost.memory_cost
        4096
        >>> cost.input_size
        0
        """
        task_name = task_name or self._get_default_task_name()
        summary = self.get_task_summary(task_name)
        if summary is None:
            return None

        if "Cost" in summary:
            task_cost = summary["Cost"]

            cpu_cost = task_cost.get("CPU")
            memory = task_cost.get("Memory")
            input_size = task_cost.get("Input")

            return Instance.TaskCost(cpu_cost, memory, input_size)

    def _raise_empty_task_info(self, resp):
        raise errors.EmptyTaskInfoError(
            "Empty response. Task server maybe dead.",
            code=resp.status_code,
            instance_id=self.id,
            endpoint=self._client.endpoint,
            request_id=resp.headers.get("x-odps-request-id"),
            tag="ODPS",
        )

    def get_task_info(self, task_name, key, raise_empty=False):
        """
        Get task related information.

        :param task_name: name of the task
        :param key: key of the information item
        :param raise_empty: if True, will raise error when response is empty
        :return: a string of the task information
        """
        actions = ["info"]
        params = OrderedDict([("taskname", task_name), ("key", key)])

        resp = self._client.get(self.resource(), actions=actions, params=params)
        resp_data = resp.content.decode()
        if raise_empty and not resp_data:
            self._raise_empty_task_info(resp)
        return resp_data

    def put_task_info(
        self, task_name, key, value, check_location=False, raise_empty=False
    ):
        """
        Put information into a task.

        :param task_name: name of the task
        :param key: key of the information item
        :param value: value of the information item
        :param check_location: raises if Location header is missing
        :param raise_empty: if True, will raise error when response is empty
        """
        actions = ["info"]
        params = {"taskname": task_name}
        headers = {"Content-Type": "application/xml"}
        body = self.TaskInfo(key=key, value=value).serialize()

        resp = self._client.put(
            self.resource(), actions=actions, params=params, headers=headers, data=body
        )

        location = resp.headers.get("Location")
        if check_location and (location is None or len(location) == 0):
            raise errors.ODPSError("Invalid response, Location header required.")
        resp_data = resp.content.decode()
        if raise_empty and not resp_data:
            self._raise_empty_task_info(resp)
        return resp_data

    def get_task_quota(self, task_name=None):
        """
        Get queueing info of the task.
        Note that time between two calls should larger than 30 seconds, otherwise empty dict is returned.

        :param task_name: name of the task
        :return: quota info in dict format
        """
        task_name = task_name or self._get_default_task_name()
        actions = ["instancequota"]
        params = {"taskname": task_name}
        resp = self._client.get(self.resource(), actions=actions, params=params)
        return json.loads(resp.text)

    def get_sql_task_cost(self):
        """
        Get cost information of the sql cost task, including input data size,
        number of UDF, Complexity of the sql task.

        NOTE that DO NOT use this function directly as it cannot be applied to
        instances returned from SQL. Use ``o.execute_sql_cost`` instead.

        :return: cost info in dict format
        """
        resp = self.get_task_result(self.get_task_names()[0])
        cost = json.loads(resp)
        sql_cost = cost["Cost"]["SQL"]

        udf_num = sql_cost.get("UDF")
        complexity = sql_cost.get("Complexity")
        input_size = sql_cost.get("Input")
        return Instance.SQLCost(udf_num, complexity, input_size)

    def _get_status(self, blocking=False):
        if self._status != Instance.Status.TERMINATED:
            self.reload(blocking)
        return self._status

    @property
    @_with_status_api_lock
    def status(self):
        return self._get_status()

    def is_terminated(self, retry=True, blocking=False, retry_timeout=None):
        """
        If this instance has finished or not.

        :return: True if finished else False
        :rtype: bool
        """
        return self._call_with_retry(
            lambda: self._get_status(blocking) == Instance.Status.TERMINATED,
            retry=retry,
            retry_timeout=retry_timeout,
        )

    def is_running(self, retry=True, blocking=False, retry_timeout=None):
        """
        If this instance is still running.

        :return: True if still running else False
        :rtype: bool
        """
        return self._call_with_retry(
            lambda: self._get_status(blocking) == Instance.Status.RUNNING,
            retry=retry,
            retry_timeout=retry_timeout,
        )

    def is_successful(self, retry=True, retry_timeout=None):
        """
        If the instance runs successfully.

        :return: True if successful else False
        :rtype: bool
        """

        if not self.is_terminated(retry=retry):
            return False

        def _get_successful():
            statuses = self.get_task_statuses()
            return all(
                task.status == Instance.Task.TaskStatus.SUCCESS
                for task in statuses.values()
            )

        return self._call_with_retry(
            _get_successful, retry=retry, retry_timeout=retry_timeout
        )

    @property
    def is_sync(self):
        return self._is_sync

    def get_all_task_progresses(self):
        return {
            task_name: self.get_task_progress(task_name)
            for task_name in self.get_task_names()
        }

    def _dump_instance_progress(self, start_time, check_time, final=False):
        if logger.getEffectiveLevel() > logging.INFO:
            return

        prog_time_interval = options.progress_time_interval
        prog_percentage_gap = options.progress_percentage_gap
        logview_latency = min(options.logview_latency, prog_time_interval)
        try:
            task_progresses = self.get_all_task_progresses()
            total_progress = sum(
                stage.finished_percentage
                for progress in task_progresses.values()
                for stage in progress.stages
            )

            if not self._logview_logged and check_time - start_time >= logview_latency:
                self._logview_logged = True
                logger.info(
                    "Instance ID: %s\n  Log view: %s",
                    self.id,
                    self.get_logview_address(),
                )

            # final log need to be outputed once the progress is updated and logview
            #  address is printed
            need_final_log = (
                final
                and self._logview_logged
                and self._last_progress_value < total_progress
            )
            # intermediate log need to be outputed once current progress exceeds certain
            #  gap or certain time elapsed
            need_intermediate_log = check_time - start_time >= prog_time_interval and (
                total_progress - self._last_progress_value >= prog_percentage_gap
                or check_time - self._last_progress_time >= prog_time_interval
            )
            if need_final_log or need_intermediate_log:
                output_parts = [str(self.id)] + [
                    progress.get_stage_progress_formatted_string()
                    for progress in task_progresses.values()
                ]
                if len(output_parts) > 1:
                    logger.info(" ".join(output_parts))
                self._last_progress_value = total_progress
                self._last_progress_time = check_time
        except:  # pragma: no cover
            # make sure progress display does not affect execution
            pass

    def wait_for_completion(
        self, interval=1, timeout=None, max_interval=None, blocking=True
    ):
        """
        Wait for the instance to complete, and neglect the consequence.

        :param interval: time interval to check
        :param max_interval: if specified, next check interval will be
            multiplied by 2 till max_interval is reached.
        :param timeout: time
        :param blocking: whether to block waiting at server side. Note that this option does
            not affect client behavior.
        :return: None
        """

        start_time = check_time = self._last_progress_time = monotonic()
        self._last_progress_value = 0
        while not self.is_terminated(
            retry=True, blocking=blocking, retry_timeout=_STATUS_QUERY_TIMEOUT
        ):
            try:
                sleep_interval_left = interval - (monotonic() - check_time)
                if sleep_interval_left > 0:
                    time.sleep(sleep_interval_left)
                check_time = monotonic()
                if max_interval is not None:
                    interval = min(interval * 2, max_interval)
                if timeout is not None and check_time - start_time > timeout:
                    raise errors.WaitTimeoutError(
                        "Wait completion of instance %s timed out" % self.id,
                        instance_id=self.id,
                    )

                self._dump_instance_progress(start_time, check_time)
            except KeyboardInterrupt:
                break
        # dump final progress
        self._dump_instance_progress(start_time, check_time, final=True)

    def wait_for_success(
        self, interval=1, timeout=None, max_interval=None, blocking=True
    ):
        """
        Wait for instance to complete, and check if the instance is successful.

        :param interval: time interval to check
        :param max_interval: if specified, next check interval will be
            multiplied by 2 till max_interval is reached.
        :param timeout: time
        :param blocking: whether to block waiting at server side. Note that this option does
            not affect client behavior.
        :return: None
        :raise: :class:`odps.errors.ODPSError` if the instance failed
        """

        self.wait_for_completion(
            interval=interval,
            max_interval=max_interval,
            timeout=timeout,
            blocking=blocking,
        )

        if not self.is_successful(retry=True):
            for task_name, task in six.iteritems(self.get_task_statuses()):
                exc = None
                if task.status == Instance.Task.TaskStatus.FAILED:
                    exc = errors.parse_instance_error(self.get_task_result(task_name))
                elif task.status != Instance.Task.TaskStatus.SUCCESS:
                    exc = errors.ODPSError(
                        "%s, status=%s" % (task_name, task.status.value)
                    )
                if exc:
                    exc.instance_id = self.id
                    raise exc

    @_with_status_api_lock
    def get_task_progress(self, task_name=None):
        """
        Get task's current progress

        :param task_name: task_name
        :return: the task's progress
        :rtype: :class:`odps.models.Instance.Task.TaskProgress`
        """
        task_name = task_name or self._get_default_task_name()
        params = {"instanceprogress": task_name, "taskname": task_name}

        resp = self._client.get(self.resource(), params=params)
        return Instance.Task.TaskProgress.parse(self._client, resp)

    @_with_status_api_lock
    def get_task_detail(self, task_name=None):
        """
        Get task's detail

        :param task_name: task name
        :return: the task's detail
        :rtype: list or dict according to the JSON
        """

        def _get_detail():
            from ..compat import json  # fix object_pairs_hook parameter for Py2.6

            params = {"taskname": task_name}
            resp = self._client.get(
                self.resource(), action="instancedetail", params=params
            )
            res = resp.content.decode() if six.PY3 else resp.content
            try:
                return json.loads(res, object_pairs_hook=OrderedDict)
            except ValueError:
                return res

        task_name = task_name or self._get_default_task_name()
        result = _get_detail()
        if not result:
            # todo: this is a workaround for the bug that get_task_detail returns nothing.
            return self.get_task_detail2(task_name)
        else:
            return result

    @_with_status_api_lock
    def get_task_detail2(self, task_name=None, **kw):
        """
        Get task's detail v2

        :param task_name: task name
        :return: the task's detail
        :rtype: list or dict according to the JSON
        """
        task_name = task_name or self._get_default_task_name()
        params = {"taskname": task_name}
        if "subquery_id" in kw:
            params["subquery_id"] = str(kw.pop("subquery_id"))

        resp = self._client.get(self.resource(), action="detail", params=params)
        res = resp.content.decode() if six.PY3 else resp.content
        try:
            return json.loads(res, object_pairs_hook=OrderedDict)
        except ValueError:
            return res

    @_with_status_api_lock
    def get_task_workers(self, task_name=None, json_obj=None):
        """
        Get workers from task
        :param task_name: task name
        :param json_obj: json object parsed from get_task_detail2
        :return: list of workers

        .. seealso:: :class:`odps.models.Worker`
        """
        if json_obj is None:
            task_name = task_name or self._get_default_task_name()

        if json_obj is None:
            json_obj = self.get_task_detail2(task_name)
        return WorkerDetail2.extract_from_json(
            json_obj, client=self._client, parent=self
        )

    @_with_status_api_lock
    def get_worker_log(self, log_id, log_type, size=0):
        """
        Get logs from worker.

        :param log_id: id of log, can be retrieved from details.
        :param log_type: type of logs. Possible log types contains {log_types}
        :param size: length of the log to retrieve
        :return: log content
        """
        params = OrderedDict([("log", ""), ("id", log_id)])
        if log_type is not None:
            log_type = log_type.lower()
            if log_type not in LOG_TYPES_MAPPING:
                raise ValueError(
                    "log_type should choose a value in "
                    + " ".join(six.iterkeys(LOG_TYPES_MAPPING))
                )
            params["logtype"] = LOG_TYPES_MAPPING[log_type]
        if size > 0:
            params["size"] = str(size)
        resp = self._client.get(self.resource(), params=params)
        return resp.text

    get_worker_log.__doc__ = get_worker_log.__doc__.format(
        log_types=", ".join(sorted(six.iterkeys(LOG_TYPES_MAPPING)))
    )

    @_with_status_api_lock
    def get_logview_address(self, hours=None, use_legacy=None):
        """
        Get logview address of the instance object by hours.

        :param hours:
        :return: logview address
        :rtype: str
        """
        if use_legacy is None:
            use_legacy = options.use_legacy_logview
        if use_legacy is None and self.project.odps.job_insight_host is not None:
            use_legacy = False
        if (
            self.project.odps.job_insight_host is None
            or self.project.odps.region_name is None
        ):
            use_legacy = True
        if use_legacy is False:
            return self._get_job_insight_address()
        return self._get_legacy_logview_address(hours=hours)

    def _get_job_insight_address(self):
        return (
            "%(job_insight_host)s/%(region_id)s/job-insights?h=%(endpoint)s"
            "&p=%(project_name)s&i=%(instance_id)s"
        ) % dict(
            job_insight_host=self.project.odps.job_insight_host,
            region_id=self.project.odps.region_name,
            endpoint=self._client.endpoint,
            project_name=self.project.name,
            instance_id=self.id,
        )

    def _get_legacy_logview_address(self, hours=None):
        if (
            self._logview_address is not None
            and monotonic() - self._logview_address_time < 600
        ):
            return self._logview_address

        project = self.project
        if isinstance(project.odps.account, BearerTokenAccount):
            token = to_str(project.odps.account.token)
        else:
            hours = hours or options.logview_hours
            policy = {
                "Statement": [
                    {
                        "Action": ["odps:Read"],
                        "Effect": "Allow",
                        "Resource": "acs:odps:*:projects/%s/instances/%s"
                        % (project.name, self.id),
                    }
                ],
                "Version": "1",
            }
            token = self.project.generate_auth_token(policy, "bearer", hours)

        link = (
            "%(logview_host)s/logview/?h=%(endpoint)s&p=%(project_name)s"
            "&i=%(instance_id)s&token=%(token)s"
        ) % dict(
            logview_host=self.project.odps.logview_host,
            endpoint=self._client.endpoint,
            project_name=project.name,
            instance_id=self.id,
            token=token,
        )
        self._logview_address = link
        self._logview_address_time = monotonic()
        return link

    def __str__(self):
        return self.id

    def _get_job(self):
        if not self._job_source:
            url = self.resource()
            resp = self._client.get(url, action="source")

            self._job_source = Job.parse(self._client, resp, parent=self)
        return self._job_source

    def get_tasks(self):
        return self.tasks

    @property
    def tasks(self):
        job = self._get_job()
        return job.tasks

    @property
    def priority(self):
        job = self._get_job()
        return job.priority

    def _get_queueing_info(self, **kw):
        params = {}
        if "subquery_id" in kw:
            params["subquery_id"] = str(kw.pop("subquery_id"))

        url = self.resource()
        resp = self._client.get(url, action="cached", params=params)
        return (
            Instance.InstanceQueueingInfo.parse(
                self._client, resp, parent=self.project.instance_queueing_infos
            ),
            resp,
        )

    def get_queueing_info(self):
        info, _ = self._get_queueing_info()
        return info

    def get_sql_query(self):
        task = [t for t in self.tasks if isinstance(t, SQLTask)]
        if not task:
            raise errors.ODPSError("Instance %s does not contain a SQLTask.", self.id)
        if len(task) > 1:  # pragma: no cover
            raise errors.ODPSError("Multiple SQLTasks exist in instance %s.", self.id)
        return task[0].query

    def get_unique_identifier_id(self):
        job = self._get_job()
        return job.unique_identifier_id

    def _check_get_task_name(self, task_type, task_name=None, err_head=None):
        if not self.is_successful(retry=True):
            raise errors.ODPSError(
                "%s, instance(%s) may fail or has not finished yet"
                % (err_head, self.id)
            )

        task_type = task_type.lower()
        filtered_tasks = {
            name: task
            for name, task in six.iteritems(self.get_task_statuses())
            if task.type.lower() == task_type
        }
        if len(filtered_tasks) > 1:
            if task_name is None:
                raise errors.ODPSError(
                    "%s, job has more than one %s tasks, please specify one"
                    % (err_head, task_type)
                )
            elif task_name not in filtered_tasks:
                raise errors.ODPSError(
                    "%s, unknown task name: %s" % (err_head, task_name)
                )
            return task_name
        elif len(filtered_tasks) == 1:
            return list(filtered_tasks)[0]
        else:
            raise errors.ODPSError("%s, job has no %s task" % (err_head, task_type))

    def _create_instance_tunnel(self, endpoint=None, quota_name=None):
        from ..tunnel import InstanceTunnel

        return InstanceTunnel(
            client=self._client,
            project=self.project,
            quota_name=quota_name,
            endpoint=endpoint or self.project._tunnel_endpoint,
        )

    @utils.survey
    def _open_result_reader(self, schema=None, task_name=None, timeout=None, **kw):
        task_name = self._check_get_task_name(
            "sql", task_name=task_name, err_head="Cannot open reader"
        )
        result = self.get_task_result(task_name, timeout=timeout)
        reader = readers.CsvRecordReader(schema, result, **kw)
        if options.result_reader_create_callback:
            options.result_reader_create_callback(reader)
        return reader

    def _open_tunnel_reader(self, **kw):
        from ..tunnel.instancetunnel import InstanceDownloadSession

        reopen = kw.pop("reopen", False)
        endpoint = kw.pop("endpoint", None)
        quota_name = kw.pop("quota_name", None)
        arrow = kw.pop("arrow", False)
        columns = kw.pop("columns", None)

        tunnel = self._create_instance_tunnel(endpoint=endpoint, quota_name=quota_name)
        download_id = self._download_id if not reopen else None

        try:
            download_session = tunnel.create_download_session(
                instance=self, download_id=download_id, **kw
            )
            if (
                download_id
                and download_session.status != InstanceDownloadSession.Status.Normal
            ):
                download_session = tunnel.create_download_session(instance=self, **kw)
        except errors.InternalServerError:
            e, tb = sys.exc_info()[1:]
            e.__class__ = Instance.DownloadSessionCreationError
            six.reraise(Instance.DownloadSessionCreationError, e, tb)

        self._download_id = download_session.id

        if arrow:
            return InstanceArrowReader(self, download_session, columns=columns)
        else:
            return InstanceRecordReader(self, download_session, columns=columns)

    def open_reader(self, *args, **kwargs):
        """
        Open the reader to read records from the result of the instance. If `tunnel` is `True`,
        instance tunnel will be used. Otherwise conventional routine will be used. If instance tunnel
        is not available and `tunnel` is not specified, the method will fall back to the
        conventional routine.
        Note that the number of records returned is limited unless `options.limited_instance_tunnel`
        is set to `True` or `limit=True` is configured under instance tunnel mode. Otherwise
        the number of records returned is always limited.

        :param tunnel: if true, use instance tunnel to read from the instance.
                       if false, use conventional routine.
                       if absent, `options.tunnel.use_instance_tunnel` will be used and automatic fallback
                       is enabled.
        :param bool limit: if True, enable the limitation
        :param bool reopen: the reader will reuse last one, reopen is true means open a new reader.
        :param endpoint: the tunnel service URL
        :param compress_option: compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
                              can be ``zlib``, ``snappy``
        :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
        :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
        :return: reader, ``count`` means the full size, ``status`` means the tunnel status

        :Example:

        >>> with instance.open_reader() as reader:
        >>>     count = reader.count  # How many records of a table or its partition
        >>>     for record in reader[0: count]:
        >>>         # read all data, actually better to split into reading for many times
        """
        use_tunnel = kwargs.get("use_tunnel", kwargs.get("tunnel"))
        auto_fallback_result = use_tunnel is None

        timeout = kwargs.pop("timeout", None)
        if use_tunnel is None:
            use_tunnel = options.tunnel.use_instance_tunnel
            if use_tunnel:
                timeout = (
                    timeout
                    if timeout is not None
                    else options.tunnel.legacy_fallback_timeout
                )
        kwargs["timeout"] = timeout

        result_fallback_errors = (
            errors.InvalidProjectTable,
            errors.InvalidArgument,
            errors.NoSuchProject,
        )
        if use_tunnel:
            # for compatibility
            if "limit_enabled" in kwargs:
                kwargs["limit"] = kwargs["limit_enabled"]
                del kwargs["limit_enabled"]

            if "limit" not in kwargs:
                kwargs["limit"] = options.tunnel.limit_instance_tunnel

            auto_fallback_protection = False
            if kwargs["limit"] is None:
                kwargs["limit"] = False
                auto_fallback_protection = True

            try:
                return self._open_tunnel_reader(**kwargs)
            except result_fallback_errors:
                # service version too low to support instance tunnel.
                if not auto_fallback_result:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Instance tunnel not supported, will fallback to "
                        "restricted approach. 10000 records will be limited. "
                        + _RESULT_LIMIT_HELPER_MSG
                    )
            except requests.Timeout:
                # tunnel creation timed out, which might be caused by too many files
                # on the service.
                if not auto_fallback_result:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Instance tunnel timed out, will fallback to restricted approach. "
                        "10000 records will be limited. You may try merging small files "
                        "on your source table. " + _RESULT_LIMIT_HELPER_MSG
                    )
            except (
                Instance.DownloadSessionCreationError,
                errors.InstanceTypeNotSupported,
            ):
                # this is for DDL sql instances such as `show partitions` which raises
                # InternalServerError when creating download sessions.
                if not auto_fallback_result:
                    raise
            except errors.NoPermission as exc:
                # project is protected or data permission is configured
                if not auto_fallback_protection:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Project or data under protection, 10000 records will be limited. "
                        "Raw error message:\n"
                        + str(exc)
                        + "\n"
                        + _RESULT_LIMIT_HELPER_MSG
                    )
                    kwargs["limit"] = True
                    return self.open_reader(*args, **kwargs)

        return self._open_result_reader(*args, **kwargs)

    def _iter_reader_with_pandas(self, iter_func, **kw):
        try:
            with self.open_reader(**kw) as reader:
                for batch in iter_func(reader):
                    yield batch
        except (errors.ChecksumError, errors.MethodNotAllowed):
            # arrow tunnel not implemented or not supported
            kw.pop("arrow", None)
            with self.open_reader(**kw) as reader:
                for batch in iter_func(reader):
                    yield batch

    def to_pandas(
        self,
        columns=None,
        limit=None,
        start=None,
        count=None,
        n_process=1,
        quota_name=None,
        tags=None,
        **kwargs
    ):
        """
        Read instance data into pandas DataFrame. The limit argument follows definition
        of `open_reader` API.

        :param list columns: columns to read
        :param bool limit: if True, enable the limitation
        :param int start: start row index from 0
        :param int count: data count to read
        :param int n_process: number of processes to accelerate reading
        :param str quota_name: name of tunnel quota to use
        """
        try:
            import pyarrow as pa
        except ImportError:
            pa = None

        kw = dict(
            limit=limit,
            columns=columns,
            arrow=pa is not None,
            quota_name=quota_name,
            tags=tags,
            **kwargs
        )
        if limit is None:
            kw.pop("limit")

        def _it(reader):
            yield reader.to_pandas(start=start, count=count, n_process=n_process)

        return next(self._iter_reader_with_pandas(_it, **kw))

    def iter_pandas(
        self,
        columns=None,
        limit=None,
        batch_size=None,
        start=None,
        count=None,
        quota_name=None,
        tags=None,
        **kwargs
    ):
        """
        Iterate table data in blocks as pandas DataFrame. The limit argument
        follows definition of `open_reader` API.

        :param list columns: columns to read
        :param bool limit: if True, enable the limitation
        :param int batch_size: size of DataFrame batch to read
        :param int start: start row index from 0
        :param int count: data count to read
        :param str quota_name: name of tunnel quota to use
        """
        try:
            import pyarrow as pa
        except ImportError:
            pa = None

        batch_size = batch_size or options.tunnel.read_row_batch_size
        kw = dict(
            limit=limit,
            columns=columns,
            arrow=pa is not None,
            quota_name=quota_name,
            tags=tags,
            **kwargs
        )
        if limit is None:
            kw.pop("limit")

        def _it(reader):
            for batch in reader.iter_pandas(
                batch_size, start=start, count=count, columns=columns
            ):
                yield batch

        for batch in self._iter_reader_with_pandas(_it, **kw):
            yield batch
