in elasticapm/contrib/sanic/__init__.py [0:0]
def _setup_exception_manager(self) -> None:
"""
Setup global exception handler where all unhandled exception can be caught and tracked to APM server
:return:
"""
# noinspection PyUnusedLocal
async def _handler(request: Request, exception: BaseException) -> None:
if not self._client:
return
self._client.capture_exception(
exc_info=sys.exc_info(),
context={
"request": await get_request_info(
config=self._client.config, request=request, event_type=constants.ERROR
),
},
handled=True,
)
self._setup_transaction_name(request=request)
set_transaction_result(result="HTTP 5xx", override=False)
set_transaction_outcome(outcome=constants.OUTCOME.FAILURE, override=False)
elastic_context(data={"status_code": 500}, key="response")
self._client.end_transaction()
if not isinstance(self._app.error_handler, ElasticAPMPatchedErrorHandler):
patched_client = ElasticAPMPatchedErrorHandler(current_handler=self._app.error_handler)
patched_client.setup_apm_handler(apm_handler=_handler)
self._app.error_handler = patched_client
else:
self._app.error_handler.setup_apm_handler(apm_handler=_handler)
try:
from elasticapm.contrib.celery import register_exception_tracking
register_exception_tracking(client=self._client)
except ImportError:
self._logger.debug(
"Failed to setup Exception tracking. "
"Please install requirements for elasticapm.contrib.celery if exception tracking is required"
)
pass