in pyiceberg/catalog/rest/__init__.py [0:0]
def _handle_non_200_response(self, exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None:
exception: Type[Exception]
if exc.response is None:
raise ValueError("Did not receive a response")
code = exc.response.status_code
if code in error_handler:
exception = error_handler[code]
elif code == 400:
exception = BadRequestError
elif code == 401:
exception = UnauthorizedError
elif code == 403:
exception = ForbiddenError
elif code == 422:
exception = RESTError
elif code == 419:
exception = AuthorizationExpiredError
elif code == 501:
exception = NotImplementedError
elif code == 503:
exception = ServiceUnavailableError
elif 500 <= code < 600:
exception = ServerError
else:
exception = RESTError
try:
if exception == OAuthError:
# The OAuthErrorResponse has a different format
error = OAuthErrorResponse.model_validate_json(exc.response.text)
response = str(error.error)
if description := error.error_description:
response += f": {description}"
if uri := error.error_uri:
response += f" ({uri})"
else:
error = ErrorResponse.model_validate_json(exc.response.text).error
response = f"{error.type}: {error.message}"
except JSONDecodeError:
# In the case we don't have a proper response
response = f"RESTError {exc.response.status_code}: Could not decode json payload: {exc.response.text}"
except ValidationError as e:
# In the case we don't have a proper response
errs = ", ".join(err["msg"] for err in e.errors())
response = (
f"RESTError {exc.response.status_code}: Received unexpected JSON Payload: {exc.response.text}, errors: {errs}"
)
raise exception(response) from exc