in src/translation/dags/validation_crun_dag.py [0:0]
def _save_dvt_aggregated_results(**kwargs):
config = ast.literal_eval(str(kwargs["dag_run"].conf["config"]))
unique_id = config["unique_id"]
failed_validations_query = f"""
SELECT COUNT(*) as failed_count FROM `{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels) AS a where a.value="{unique_id}" and validation_status="fail";
"""
query_job = client.query(failed_validations_query) # Make an API request
failed_validations_count = [row["failed_count"] for row in query_job][0]
successful_validations_query = f"""
SELECT COUNT(*) as successful_count FROM `{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels) AS a where a.value="{unique_id}" and validation_status="success";
"""
query_job = client.query(successful_validations_query) # Make an API request
successful_validations_count = [row["successful_count"] for row in query_job][0]
total_validations_count = failed_validations_count + successful_validations_count
op_type = config["type"]
if op_type in ["ddl", "data", "dml"]:
validation_type = config["validation_config"]["validation_type"]
elif op_type == "sql":
validation_type = (
"custom query " + config["validation_config"]["validation_type"]
)
aggregated_results = [
{
"unique_id": unique_id,
"validation_type": validation_type,
"total_validations": total_validations_count,
"successful_validations": successful_validations_count,
"failed_validations": failed_validations_count,
}
]
if aggregated_results == []:
logging.info("DVT Aggregate Stats are empty.")
else:
client.insert_rows_json(DVT_AGGREGATED_RESULTS_TABLE_ID, aggregated_results)