in wadebug/wa_actions/implementations/check_webhook.py [0:0]
def _run(cls, config, *args, **kwargs):
api = WABizAPI(**config.get("webapp"))
try:
webhook_url = api.get_webhook_url()
if not webhook_url:
return _result_webhook_not_set(cls)
if not webhook_url.startswith("https"):
return _result_webhook_not_https(cls)
except (WABizAccessError, WABizAuthError, WABizNetworkError, ValueError) as e:
return _result_get_webhook_error(cls, e)
# explicitly catching a possible exception
except WABizGeneralError as e:
# WABizGeneralError is likely not a user error,
# should be handled by app-wide catch
raise e
wacore_containers = docker_utils.get_running_wacore_containers()
if not wacore_containers:
return results.Problem(
cls,
"Webhook check failed",
"There is no wacore container running",
"Please check results from other actions to diagnose",
)
container = wacore_containers[0]
cert_str = api.get_webhook_cert()
dest_cert = None
if cert_str:
with tempfile.NamedTemporaryFile() as cert_file:
cert_file.write(cert_str)
cert_file.seek(0)
docker_utils.put_archive_to_container(
container, cert_file.name, DEST_PATH
)
dest_cert = os.path.join(DEST_PATH, os.path.basename(cert_file.name))
result, response_time = curl_utils.https_post_request_from_container(
container, webhook_url, TEST_POST_DATA_STRING, REQ_TIMEOUT, dest_cert
)
if result == CURLTestResult.CONNECTION_ERROR:
return _result_webhook_could_not_connect(cls)
elif result == CURLTestResult.SSL_CERT_UNKNOWN:
return _result_webhook_no_cert_uploaded(cls)
elif result == CURLTestResult.CONNECTION_TIMEOUT:
return _result_webhook_did_not_respond(cls)
elif result == CURLTestResult.HTTP_STATUS_NOT_OK:
return _result_webhook_did_not_return_ok(cls)
elif response_time > ACCEPTABLE_RESPONSE_TIME:
return _result_webhook_slow_response(cls, response_time)
return results.OK(cls)