in api-connector/src/handlers/cloud_task.py [0:0]
def execute(request: Any) -> Tuple[Any, int]:
logger.debug("Cloud Task request received.")
workflow_id = Utils.get_property(request, "workflow_id")
logger.debug("Processing authentication type...")
auth = Utils.get_property(request, "auth")
credentials = None
auth_type = ""
secret_data = None
if auth:
try:
secret_name = Utils.get_property(auth, "secret_name", required=True)
secret_data = json.loads(Secrets.get_value(secret_name))
logger.info(f"Obtained secret authentication data for '{secret_name}'")
except Exception as e:
return f"Could not retrieve secret: {e}", 500
try:
auth_type = Utils.get_property(auth, "type", required=True)
match (AuthType[auth_type]):
case AuthType.CLIENT_CREDENTIALS:
logger.debug("Requesting credentials from server...")
auth_server = Utils.get_property(
auth, "auth_server", required=True
)
client_id = Utils.get_property(auth, "client_id", required=True)
client_secret = Utils.get_property(
secret_data, "client_secret", required=True
)
token_res = requests.post(
url=auth_server,
data={"grant_type": "client_credentials"},
allow_redirects=False,
auth=(client_id, client_secret),
)
if token_res.status_code != 200:
msg = f"Failed to obtain token from the OAuth 2.0 server: {token_res.text}"
logger.error(msg)
return msg, 500
access_token = Utils.get_property(
token_res.json(), "access_token", required=True
)
credentials = access_token
logger.debug("Success.")
case AuthType.HTTP_BASIC:
logger.debug(
"Accessing secret properties (username, password)..."
)
credentials = (
Utils.get_property(secret_data, "username", required=True),
Utils.get_property(secret_data, "password", required=True),
)
logger.debug("Success.")
except KeyError as ke:
msg = f"Authentication type of '{auth_type}' is not supported."
logger.error(msg)
return msg, 422
logger.info(
f"Credential processing complete. Will use authentication type '{auth_type}'."
)
request_config = Utils.get_property(request, "request_config", required=True)
uri = Utils.get_property(request_config, "uri", required=True)
method = Utils.get_property(request_config, "method") or "POST"
timeout = Utils.get_property(request_config, "timeout") or 10
body = Utils.get_property(request, "body")
headers = Utils.get_property(request, "headers")
query_string = Utils.get_property(request, "query_string")
result_table = Utils.get_property(request, "result_table").replace(
"tbl_process_log", "tbl_result"
)
log_table = result_table.replace("tbl_result", "tbl_process_log")
rpl = result_table.replace("tbl_process_log", "*").replace("tbl_result", "*")
logger.info(f"Results will be sent to '{rpl}'.")
res = CloudTaskRequest.api_call(
uri,
AuthType[auth_type],
credentials,
query_string,
body,
timeout,
headers,
method,
)
# Persist response on BigQuery
Utils.save_bigquery(
result_table,
{
"request": {
"uri": uri,
"method": method,
"auth_type": auth_type,
"query_string": query_string,
"body": json.dumps(body),
},
"request_time": f"{datetime.now().isoformat()}",
"elapsed_time": res.elapsed.total_seconds(),
"response": {
"status_code": res.status_code,
"headers": json.dumps(dict(res.headers)),
"body": res.text,
},
},
)
# Send response to Pub/Sub, if configured
if config.PUBSUB_TOPICS:
topics = json.loads(config.PUBSUB_TOPICS)
topics = [[v for k, v in t.items() if k == workflow_id] for t in topics]
topic = next((x for xs in topics for x in xs), False)
if topic:
publisher = pubsub_v1.PublisherClient()
logger.debug(
f"Publishing response data from '{workflow_id}' to '{topic}'..."
)
publisher.publish(str(topic), res.text.encode("utf-8"))
logger.debug("Done.")
# Log results/status
if res.status_code == 200:
log_info = {"status_code": res.status_code}
else:
log_info = {"status_code": res.status_code, "error_message": res.text}
Utils.save_bigquery(
log_table,
{
"query_string": query_string,
"body": json.dumps(body),
"result": json.dumps(log_info),
"exec_time": f"{datetime.now().isoformat()}",
},
)
# logger.debug(f"Got response: {res_json}")
ctype_header = res.headers.get("Content-Type")
res_out = ""
if ctype_header and ctype_header.startswith("application/json"):
res_out = json.dumps(res.json())
else:
res_out = res.text
logger.debug(res_out)
return res_out, 202