def _invoke_cloud_run()

in src/translation/dags/validation_crun_dag.py [0:0]


def _invoke_cloud_run(request_json):
    url = get_cloud_run_url(f"edw-dvt-tool-{CUSTOMER_NAME}", project_id)
    headers = {"Authorization": "Bearer " + get_token()}
    config = request_json["config"]
    validation_params_from_gcs = request_json["validation_params_from_gcs"]
    validation_type = config["type"]
    if validation_type in ["ddl", "data", "dml"]:
        table = request_json["table"]
        request_content = {
            "config": config,
            "table": table,
            "validation_params_from_gcs": validation_params_from_gcs,
        }
        print(f"Running validation for table: {table}")
    elif validation_type == "sql":
        sql_file = request_json["sql_file"]
        request_content = {
            "config": config,
            "sql_file": sql_file,
            "validation_params_from_gcs": validation_params_from_gcs,
        }
        print(f"Running validation for sql file: {sql_file}")
    else:
        print(f"Unknown validation type: {validation_type}")
        raise AirflowFailException("DVT CloudRun execution failed!")

    res = requests.post(url, headers=headers, json=request_content)
    print(f"Status Code received from DVT CloudRun: {res.status_code}")
    print("Received following response from DVT CloudRun")
    print(res.content.decode())
    if res.status_code == HTTPStatus.OK:
        print("Validation CloudRun execution successful")
    else:
        print(
            f"Validation CloudRun execution failed with status code: {res.status_code}"
        )
        raise AirflowFailException("DVT CloudRun execution failed!")