odps/errors.py (299 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import json
import logging
import operator
from datetime import datetime
from requests import ConnectTimeout as RequestsConnectTimeout
from . import utils
from .compat import ElementTree as ET
from .compat import ElementTreeParseError as ETParseError
from .compat import TimeoutError, reduce, six
logger = logging.getLogger(__name__)
class DatetimeOverflowError(OverflowError):
pass
class DependencyNotInstalledError(Exception):
pass
class InteractiveError(Exception):
pass
def parse_response(resp, endpoint=None, tag=None):
"""Parses the content of response and returns an exception object."""
try:
try:
content = resp.content
root = ET.fromstring(content)
code = root.find("./Code").text
msg = root.find("./Message").text
request_id = root.find("./RequestId").text
host_id = root.find("./HostId").text
except ETParseError:
request_id = resp.headers.get("x-odps-request-id", None)
if len(resp.content) > 0:
obj = json.loads(resp.text)
msg = obj["Message"]
code = obj.get("Code")
host_id = obj.get("HostId")
if request_id is None:
request_id = obj.get("RequestId")
else:
raise
clz = globals().get(code, ODPSError)
return clz(
msg,
request_id=request_id,
code=code,
host_id=host_id,
endpoint=endpoint,
tag=tag,
response_headers=resp.headers,
)
except:
# Error occurred during parsing the response. We ignore it and delegate
# the situation to caller to handle.
logger.debug(utils.stringify_expt())
if resp.status_code == 404:
msg = "Not found error reported by server."
if endpoint:
msg += " Endpoint %s might be malfunctioning." % endpoint
return NoSuchObject(msg, endpoint=endpoint, tag=tag)
elif resp.status_code == 401:
return Unauthorized("Unauthorized.", endpoint=endpoint, tag=tag)
else:
text = resp.content.decode() if six.PY3 else resp.content
if text:
if resp.status_code == 502 and _nginx_bad_gateway_message in text:
return BadGatewayError(
text, code=str(resp.status_code), endpoint=endpoint, tag=tag
)
else:
return ODPSError(
text, code=str(resp.status_code), endpoint=endpoint, tag=tag
)
else:
return ODPSError(str(resp.status_code), endpoint=endpoint, tag=tag)
def throw_if_parsable(resp, endpoint=None, tag=None):
"""Try to parse the content of the response and raise an exception
if necessary.
"""
raise parse_response(resp, endpoint, tag)
_CODE_MAPPING = {
"ODPS-0010000": "InternalServerError",
"ODPS-0110141": "DataVersionError",
"ODPS-0123055": "ScriptError",
"ODPS-0130131": "NoSuchTable",
"ODPS-0130013": "NoPermission",
"ODPS-0130161": "ParseError",
"ODPS-0430055": "InternalConnectionError",
}
_SQA_CODE_MAPPING = {
"ODPS-180": "SQAGenericError",
"ODPS-181": "SQARetryError",
"ODPS-182": "SQAAccessDenied",
"ODPS-183": "SQAResourceNotEnough",
"ODPS-184": "SQAServiceUnavailable",
"ODPS-185": "SQAUnsupportedFeature",
"ODPS-186": "SQAQueryTimedout",
}
_nginx_bad_gateway_message = "the page you are looking for is currently unavailable"
def parse_instance_error(msg):
raw_msg = msg
try:
root = ET.fromstring(msg)
code = root.find("./Code").text
msg = root.find("./Message").text
request_id_node = root.find("./RequestId")
request_id = request_id_node.text if request_id_node else None
host_id_node = root.find("./HostId")
host_id = host_id_node.text if host_id_node else None
clz = globals().get(code, ODPSError)
return clz(msg, request_id=request_id, code=code, host_id=host_id)
except:
pass
msg = utils.to_str(raw_msg)
msg_parts = reduce(operator.add, (pt.split(":") for pt in msg.split(" - ")))
msg_parts = [pt.strip() for pt in msg_parts]
try:
msg_code = next(p for p in msg_parts if p.startswith("ODPS-"))
if msg_code in _CODE_MAPPING:
cls = globals().get(_CODE_MAPPING[msg_code], ODPSError)
elif len(msg_code) > 8 and msg_code[:8] in _SQA_CODE_MAPPING:
# sometimes SQA will report nested odps errors.
# return the outer error type instead of the inner one.
cls = globals().get(_SQA_CODE_MAPPING[msg_code[:8]], ODPSError)
return cls(msg, code=msg_code)
else:
cls = ODPSError
except StopIteration:
cls = ODPSError
msg_code = None
return cls(msg, code=msg_code)
class BaseODPSError(Exception):
"""Base class of ODPS error"""
def __init__(
self,
msg,
request_id=None,
code=None,
host_id=None,
instance_id=None,
endpoint=None,
tag=None,
response_headers=None,
):
super(BaseODPSError, self).__init__(msg)
self.request_id = request_id
self.instance_id = instance_id
self.code = code
self.host_id = host_id
self.endpoint = endpoint
self.tag = tag
def __str__(self):
message = self.args[0]
head_parts = []
if self.code:
head_parts.append("%s:" % self.code)
if self.request_id:
head_parts.append("RequestId: %s" % self.request_id)
if self.instance_id:
head_parts.append("InstanceId: %s" % self.instance_id)
if self.tag:
head_parts.append("Tag: %s" % self.tag)
if self.endpoint:
head_parts.append("Endpoint: %s" % self.endpoint)
if head_parts:
return "%s\n%s" % (" ".join(head_parts), message)
return message
@classmethod
def parse(cls, resp):
return parse_response(resp)
class ODPSError(BaseODPSError, RuntimeError):
pass
class ODPSClientError(ODPSError):
pass
class ConnectTimeout(ODPSError, TimeoutError, RequestsConnectTimeout):
pass
class DataHealthManagerError(ODPSError):
pass
class ServerDefinedException(ODPSError):
pass
# A long list of server defined exceptions
class MethodNotAllowed(ServerDefinedException):
pass
class NoSuchObject(ServerDefinedException):
pass
class NoSuchProject(NoSuchObject):
pass
class NoSuchPartition(NoSuchObject):
pass
class NoSuchPath(NoSuchObject):
pass
class NoSuchTable(NoSuchObject):
pass
class NoSuchVolume(NoSuchObject):
pass
class InvalidArgument(ServerDefinedException):
pass
class AuthenticationRequestExpired(ServerDefinedException):
pass
class AuthorizationRequired(ServerDefinedException):
pass
class Unauthorized(AuthorizationRequired):
pass
class SignatureNotMatch(ServerDefinedException):
pass
class SchemaParseError(ServerDefinedException):
pass
class InvalidStateSetting(ServerDefinedException):
pass
class InvalidProjectTable(ServerDefinedException):
pass
class NoPermission(ServerDefinedException):
pass
class InternalServerError(ServerDefinedException):
pass
class ReadMetaError(InternalServerError):
pass
class ServiceUnavailable(InternalServerError):
pass
class ScriptError(ServerDefinedException):
pass
class ParseError(ServerDefinedException):
def __init__(self, *args, **kw):
super(ParseError, self).__init__(*args, **kw)
self.statement = None
def __str__(self):
message = super(ParseError, self).__str__()
if self.statement is None:
return message
first_row, rests = message.split("\n", 1)
statement_row = "SQL Statement: " + self.statement
return "\n".join([first_row, statement_row, rests])
class DataVersionError(InternalServerError):
pass
class BadGatewayError(InternalServerError):
pass
class InstanceTypeNotSupported(ServerDefinedException):
pass
class InvalidParameter(ServerDefinedException):
pass
class StreamSessionNotFound(ServerDefinedException):
pass
class UpsertSessionNotFound(ServerDefinedException):
pass
class OverwriteModeNotAllowed(ServerDefinedException):
pass
class TableModified(ServerDefinedException):
pass
class SchemaModified(ServerDefinedException):
def __init__(self, *args, **kw):
super(SchemaModified, self).__init__(*args, **kw)
response_headers = kw.get("response_headers") or dict()
self.latest_schema_version = response_headers.get(
"odps-tunnel-latest-schema-version"
)
class NoSuchSchema(ServerDefinedException):
pass
class RequestTimeTooSkewed(ServerDefinedException):
def __init__(self, msg, *args, **kwargs):
super(RequestTimeTooSkewed, self).__init__(msg, *args, **kwargs)
try:
parts = msg.split(",")
kv_dict = dict(tuple(s.strip() for s in p.split(":", 1)) for p in parts)
self.max_interval_date = int(kv_dict["max_interval_date"])
self.expire_date = self._parse_error_date(kv_dict["expire_date"])
self.now_date = self._parse_error_date(kv_dict["now_date"])
except:
self.max_interval_date = None
self.expire_date = None
self.now_date = None
@staticmethod
def _parse_error_date(date_str):
date_obj = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
micros = date_obj.microsecond
return datetime.fromtimestamp(calendar.timegm(date_obj.timetuple())).replace(
microsecond=micros
)
# Handling error code typo in ODPS error message
RequestTimeTooSkewd = RequestTimeTooSkewed
class NotSupportedError(ODPSError):
pass
class WaitTimeoutError(ODPSError, TimeoutError):
pass
class SecurityQueryError(ODPSError):
pass
class OSSSignUrlError(ODPSError):
def __init__(self, err):
if isinstance(err, six.string_types):
super(OSSSignUrlError, self).__init__(err)
self.oss_exception = None
else:
super(OSSSignUrlError, self).__init__(str(err))
self.oss_exception = err
class SQAError(ODPSError):
pass
class SQAGenericError(SQAError):
pass
# if this error is thrown, you may retry your request.
class SQARetryError(SQAError):
pass
class SQAAccessDenied(SQAError):
pass
class SQAResourceNotEnough(SQAError):
pass
class SQAServiceUnavailable(SQAError):
pass
class SQAUnsupportedFeature(SQAError):
pass
class SQAQueryTimedout(SQAError):
pass
class EmptyTaskInfoError(ODPSError):
pass
class ChecksumError(ODPSError, IOError):
pass