in src/translation/dags/validation_dag.py [0:0]
def _save_dvt_aggregated_results(ti):
config = ti.xcom_pull(key="config", task_ids="get_config")
unique_id = config["unique_id"]
failed_validations_query = f"""
SELECT COUNT(*) as failed_count FROM\
`{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels)\
AS a where a.value="{unique_id}" and validation_status="fail";
"""
query_job = client.query(failed_validations_query)
failed_validations_count = [row["failed_count"] for row in query_job][0]
successful_validations_query = f"""
SELECT COUNT(*) as successful_count FROM\
`{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels)\
AS a where a.value="{unique_id}"\
and validation_status="success";
"""
query_job = client.query(successful_validations_query) # Make an API request
successful_validations_count = [row["successful_count"] for row in query_job][0]
total_validations_count = failed_validations_count + successful_validations_count
op_type = config["type"]
if op_type in ["ddl", "data", "dml"]:
validation_type = config["validation_config"]["validation_type"]
elif op_type == "sql":
validation_type = (
"custom query " + config["validation_config"]["validation_type"]
)
aggregated_results = [
{
"unique_id": unique_id,
"validation_type": validation_type,
"total_validations": total_validations_count,
"successful_validations": successful_validations_count,
"failed_validations": failed_validations_count,
}
]
if aggregated_results == []:
logging.info("DVT Aggregate Stats are empty.")
else:
client.insert_rows_json(DVT_AGGREGATED_RESULTS_TABLE_ID, aggregated_results)