in elasticapm/contrib/django/management/commands/elasticapm.py [0:0]
def handle_test(self, command, **options):
"""Send a test error to APM Server"""
# can't be async for testing
class LogCaptureHandler(logging.Handler):
def __init__(self, level=logging.NOTSET) -> None:
self.logs = []
super(LogCaptureHandler, self).__init__(level)
def handle(self, record) -> None:
self.logs.append(record)
handler = LogCaptureHandler()
logger = logging.getLogger("elasticapm.transport")
logger.addHandler(handler)
config = {"async_mode": False}
for key in ("service_name", "secret_token"):
if options.get(key):
config[key] = options[key]
client = DjangoClient(**config)
client.error_logger = ColoredLogger(self.stderr)
client.logger = ColoredLogger(self.stderr)
self.write(
"Trying to send a test error to APM Server using these settings:\n\n"
"SERVICE_NAME:\t%s\n"
"SECRET_TOKEN:\t%s\n"
"SERVER:\t\t%s\n\n" % (client.config.service_name, client.config.secret_token, client.config.server_url)
)
try:
raise TestException("Hi there!")
except TestException:
client.capture_exception()
client.close()
if not handler.logs:
self.write(
"Success! We tracked the error successfully! \n"
"You should see it in the APM app in Kibana momentarily. \n"
'Look for "TestException: Hi there!" in the Errors tab of the %s app' % client.config.service_name
)
else:
self.write("Oops. That didn't work. The following error occured: \n\n", red)
for entry in handler.logs:
self.write(entry.getMessage(), red)