aws_advanced_python_wrapper/utils/pg_exception_handler.py (77 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from psycopg.errors import (ConnectionTimeout,
InvalidAuthorizationSpecification, InvalidPassword,
OperationalError)
from aws_advanced_python_wrapper.errors import QueryTimeoutError
from aws_advanced_python_wrapper.exception_handling import ExceptionHandler
class PgExceptionHandler(ExceptionHandler):
_PASSWORD_AUTHENTICATION_FAILED_MSG = "password authentication failed"
_PAM_AUTHENTICATION_FAILED_MSG = "PAM authentication failed"
_CONNECTION_FAILED = "connection failed"
_CONSUMING_INPUT_FAILED = "consuming input failed"
_NETWORK_ERRORS: List[str]
_ACCESS_ERRORS: List[str]
def is_network_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
if isinstance(error, QueryTimeoutError) or isinstance(error, ConnectionTimeout):
return True
if sql_state is None:
try:
error_sql_state = getattr(error, "sqlstate")
if error_sql_state is not None:
sql_state = error_sql_state
except AttributeError:
# getattr may throw an AttributeError if the error does not have a `sqlstate` attribute
pass
if sql_state is not None and sql_state in self._NETWORK_ERRORS:
return True
if isinstance(error, OperationalError):
if len(error.args) == 0:
return False
# Check the error message if this is a generic error
error_msg: str = error.args[0]
return self._CONNECTION_FAILED in error_msg or self._CONSUMING_INPUT_FAILED in error_msg
return False
def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
if error:
if isinstance(error, InvalidAuthorizationSpecification) or isinstance(error, InvalidPassword):
return True
if sql_state is None and hasattr(error, "sqlstate") and error.sqlstate is not None:
sql_state = error.sqlstate
if sql_state is not None and sql_state in self._ACCESS_ERRORS:
return True
if isinstance(error, OperationalError):
if len(error.args) == 0:
return False
# Check the error message if this is a generic error
error_msg: str = error.args[0]
if self._PASSWORD_AUTHENTICATION_FAILED_MSG in error_msg \
or self._PAM_AUTHENTICATION_FAILED_MSG in error_msg:
return True
return False
class SingleAzPgExceptionHandler(PgExceptionHandler):
_NETWORK_ERRORS: List[str] = [
"53", # insufficient resources
"57P01", # admin shutdown
"57P02", # crash shutdown
"57P03", # cannot connect now
"58", # system error(backend)
"08", # connection error
"99", # unexpected error
"F0", # configuration file error(backend)
"XX" # internal error(backend)
]
_ACCESS_ERRORS: List[str] = [
"28000", # PAM authentication errors
"28P01"
]
class MultiAzPgExceptionHandler(PgExceptionHandler):
_NETWORK_ERRORS: List[str] = [
"28000", # access denied during reboot, this should be considered a temporary failure
"53", # insufficient resources
"57P01", # admin shutdown
"57P02", # crash shutdown
"57P03", # cannot connect now
"58", # system error(backend)
"08", # connection error
"99", # unexpected error
"F0", # configuration file error(backend)
"XX" # internal error(backend)
]
_ACCESS_ERRORS: List[str] = ["28P01"]