def _save_dvt_aggregated_results()

in src/translation/dags/validation_crun_dag.py [0:0]


def _save_dvt_aggregated_results(**kwargs):
    config = ast.literal_eval(str(kwargs["dag_run"].conf["config"]))
    unique_id = config["unique_id"]
    failed_validations_query = f"""
        SELECT COUNT(*) as failed_count FROM `{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels) AS a where a.value="{unique_id}" and validation_status="fail";
    """
    query_job = client.query(failed_validations_query)  # Make an API request
    failed_validations_count = [row["failed_count"] for row in query_job][0]
    successful_validations_query = f"""
        SELECT COUNT(*) as successful_count FROM `{project_id}.dmt_logs.dmt_dvt_results` CROSS JOIN UNNEST(labels) AS a where a.value="{unique_id}" and validation_status="success";
    """
    query_job = client.query(successful_validations_query)  # Make an API request
    successful_validations_count = [row["successful_count"] for row in query_job][0]
    total_validations_count = failed_validations_count + successful_validations_count
    op_type = config["type"]
    if op_type in ["ddl", "data", "dml"]:
        validation_type = config["validation_config"]["validation_type"]
    elif op_type == "sql":
        validation_type = (
            "custom query " + config["validation_config"]["validation_type"]
        )
    aggregated_results = [
        {
            "unique_id": unique_id,
            "validation_type": validation_type,
            "total_validations": total_validations_count,
            "successful_validations": successful_validations_count,
            "failed_validations": failed_validations_count,
        }
    ]
    if aggregated_results == []:
        logging.info("DVT Aggregate Stats are empty.")
    else:
        client.insert_rows_json(DVT_AGGREGATED_RESULTS_TABLE_ID, aggregated_results)