in src/translation/dvt/main.py [0:0]
def run():
response, response_code = "Empty Response", HTTPStatus.INTERNAL_SERVER_ERROR
request_content = _get_request_content(flask.request)
config = request_content["config"]
validation_params_from_gcs = request_content["validation_params_from_gcs"]
validation_type = config["type"]
if validation_type in ["ddl", "data", "dml"]:
table = request_content["table"]
dvt_cmd, cmd_errors = get_dvt_cmd_ddl_validation(
config, table, validation_params_from_gcs
)
else:
sql_file = request_content["sql_file"]
dvt_cmd, cmd_errors = get_dvt_cmd_sql_validation(
config, sql_file, validation_params_from_gcs
)
if cmd_errors != "":
response = cmd_errors
else:
wrapper_cmd = split("/bin/sh -c")
wrapper_cmd.append(dvt_cmd)
try:
output = check_output(wrapper_cmd, stderr=STDOUT)
print("DVT validation successful!")
response, response_code = output.decode(), HTTPStatus.OK
except CalledProcessError as exc:
print("Encountered error in DVT validation!")
response, response_code = (
exc.output.decode(),
HTTPStatus.INTERNAL_SERVER_ERROR,
)
print(f"Response code sent back to DAG: {response_code}")
print("Sent following response back to DAG")
print(response)
return response, response_code