in src/translation/dags/validation_crun_dag.py [0:0]
def _invoke_cloud_run(request_json):
url = get_cloud_run_url(f"edw-dvt-tool-{CUSTOMER_NAME}", project_id)
headers = {"Authorization": "Bearer " + get_token()}
config = request_json["config"]
validation_params_from_gcs = request_json["validation_params_from_gcs"]
validation_type = config["type"]
if validation_type in ["ddl", "data", "dml"]:
table = request_json["table"]
request_content = {
"config": config,
"table": table,
"validation_params_from_gcs": validation_params_from_gcs,
}
print(f"Running validation for table: {table}")
elif validation_type == "sql":
sql_file = request_json["sql_file"]
request_content = {
"config": config,
"sql_file": sql_file,
"validation_params_from_gcs": validation_params_from_gcs,
}
print(f"Running validation for sql file: {sql_file}")
else:
print(f"Unknown validation type: {validation_type}")
raise AirflowFailException("DVT CloudRun execution failed!")
res = requests.post(url, headers=headers, json=request_content)
print(f"Status Code received from DVT CloudRun: {res.status_code}")
print("Received following response from DVT CloudRun")
print(res.content.decode())
if res.status_code == HTTPStatus.OK:
print("Validation CloudRun execution successful")
else:
print(
f"Validation CloudRun execution failed with status code: {res.status_code}"
)
raise AirflowFailException("DVT CloudRun execution failed!")