in odps/models/instance.py [0:0]
def open_reader(self, *args, **kwargs):
"""
Open the reader to read records from the result of the instance. If `tunnel` is `True`,
instance tunnel will be used. Otherwise conventional routine will be used. If instance tunnel
is not available and `tunnel` is not specified, the method will fall back to the
conventional routine.
Note that the number of records returned is limited unless `options.limited_instance_tunnel`
is set to `True` or `limit=True` is configured under instance tunnel mode. Otherwise
the number of records returned is always limited.
:param tunnel: if true, use instance tunnel to read from the instance.
if false, use conventional routine.
if absent, `options.tunnel.use_instance_tunnel` will be used and automatic fallback
is enabled.
:param bool limit: if True, enable the limitation
:param bool reopen: the reader will reuse last one, reopen is true means open a new reader.
:param endpoint: the tunnel service URL
:param compress_option: compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
can be ``zlib``, ``snappy``
:param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
:param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
:return: reader, ``count`` means the full size, ``status`` means the tunnel status
:Example:
>>> with instance.open_reader() as reader:
>>> count = reader.count # How many records of a table or its partition
>>> for record in reader[0: count]:
>>> # read all data, actually better to split into reading for many times
"""
use_tunnel = kwargs.get("use_tunnel", kwargs.get("tunnel"))
auto_fallback_result = use_tunnel is None
timeout = kwargs.pop("timeout", None)
if use_tunnel is None:
use_tunnel = options.tunnel.use_instance_tunnel
if use_tunnel:
timeout = (
timeout
if timeout is not None
else options.tunnel.legacy_fallback_timeout
)
kwargs["timeout"] = timeout
result_fallback_errors = (
errors.InvalidProjectTable,
errors.InvalidArgument,
errors.NoSuchProject,
)
if use_tunnel:
# for compatibility
if "limit_enabled" in kwargs:
kwargs["limit"] = kwargs["limit_enabled"]
del kwargs["limit_enabled"]
if "limit" not in kwargs:
kwargs["limit"] = options.tunnel.limit_instance_tunnel
auto_fallback_protection = False
if kwargs["limit"] is None:
kwargs["limit"] = False
auto_fallback_protection = True
try:
return self._open_tunnel_reader(**kwargs)
except result_fallback_errors:
# service version too low to support instance tunnel.
if not auto_fallback_result:
raise
if not kwargs.get("limit"):
warnings.warn(
"Instance tunnel not supported, will fallback to "
"restricted approach. 10000 records will be limited. "
+ _RESULT_LIMIT_HELPER_MSG
)
except requests.Timeout:
# tunnel creation timed out, which might be caused by too many files
# on the service.
if not auto_fallback_result:
raise
if not kwargs.get("limit"):
warnings.warn(
"Instance tunnel timed out, will fallback to restricted approach. "
"10000 records will be limited. You may try merging small files "
"on your source table. " + _RESULT_LIMIT_HELPER_MSG
)
except (
Instance.DownloadSessionCreationError,
errors.InstanceTypeNotSupported,
):
# this is for DDL sql instances such as `show partitions` which raises
# InternalServerError when creating download sessions.
if not auto_fallback_result:
raise
except errors.NoPermission as exc:
# project is protected or data permission is configured
if not auto_fallback_protection:
raise
if not kwargs.get("limit"):
warnings.warn(
"Project or data under protection, 10000 records will be limited. "
"Raw error message:\n"
+ str(exc)
+ "\n"
+ _RESULT_LIMIT_HELPER_MSG
)
kwargs["limit"] = True
return self.open_reader(*args, **kwargs)
return self._open_result_reader(*args, **kwargs)