def open_reader()

in odps/models/instance.py [0:0]


    def open_reader(self, *args, **kwargs):
        """
        Open the reader to read records from the result of the instance. If `tunnel` is `True`,
        instance tunnel will be used. Otherwise conventional routine will be used. If instance tunnel
        is not available and `tunnel` is not specified, the method will fall back to the
        conventional routine.
        Note that the number of records returned is limited unless `options.limited_instance_tunnel`
        is set to `True` or `limit=True` is configured under instance tunnel mode. Otherwise
        the number of records returned is always limited.

        :param tunnel: if true, use instance tunnel to read from the instance.
                       if false, use conventional routine.
                       if absent, `options.tunnel.use_instance_tunnel` will be used and automatic fallback
                       is enabled.
        :param bool limit: if True, enable the limitation
        :param bool reopen: the reader will reuse last one, reopen is true means open a new reader.
        :param endpoint: the tunnel service URL
        :param compress_option: compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
                              can be ``zlib``, ``snappy``
        :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
        :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
        :return: reader, ``count`` means the full size, ``status`` means the tunnel status

        :Example:

        >>> with instance.open_reader() as reader:
        >>>     count = reader.count  # How many records of a table or its partition
        >>>     for record in reader[0: count]:
        >>>         # read all data, actually better to split into reading for many times
        """
        use_tunnel = kwargs.get("use_tunnel", kwargs.get("tunnel"))
        auto_fallback_result = use_tunnel is None

        timeout = kwargs.pop("timeout", None)
        if use_tunnel is None:
            use_tunnel = options.tunnel.use_instance_tunnel
            if use_tunnel:
                timeout = (
                    timeout
                    if timeout is not None
                    else options.tunnel.legacy_fallback_timeout
                )
        kwargs["timeout"] = timeout

        result_fallback_errors = (
            errors.InvalidProjectTable,
            errors.InvalidArgument,
            errors.NoSuchProject,
        )
        if use_tunnel:
            # for compatibility
            if "limit_enabled" in kwargs:
                kwargs["limit"] = kwargs["limit_enabled"]
                del kwargs["limit_enabled"]

            if "limit" not in kwargs:
                kwargs["limit"] = options.tunnel.limit_instance_tunnel

            auto_fallback_protection = False
            if kwargs["limit"] is None:
                kwargs["limit"] = False
                auto_fallback_protection = True

            try:
                return self._open_tunnel_reader(**kwargs)
            except result_fallback_errors:
                # service version too low to support instance tunnel.
                if not auto_fallback_result:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Instance tunnel not supported, will fallback to "
                        "restricted approach. 10000 records will be limited. "
                        + _RESULT_LIMIT_HELPER_MSG
                    )
            except requests.Timeout:
                # tunnel creation timed out, which might be caused by too many files
                # on the service.
                if not auto_fallback_result:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Instance tunnel timed out, will fallback to restricted approach. "
                        "10000 records will be limited. You may try merging small files "
                        "on your source table. " + _RESULT_LIMIT_HELPER_MSG
                    )
            except (
                Instance.DownloadSessionCreationError,
                errors.InstanceTypeNotSupported,
            ):
                # this is for DDL sql instances such as `show partitions` which raises
                # InternalServerError when creating download sessions.
                if not auto_fallback_result:
                    raise
            except errors.NoPermission as exc:
                # project is protected or data permission is configured
                if not auto_fallback_protection:
                    raise
                if not kwargs.get("limit"):
                    warnings.warn(
                        "Project or data under protection, 10000 records will be limited. "
                        "Raw error message:\n"
                        + str(exc)
                        + "\n"
                        + _RESULT_LIMIT_HELPER_MSG
                    )
                    kwargs["limit"] = True
                    return self.open_reader(*args, **kwargs)

        return self._open_result_reader(*args, **kwargs)