def _save_dry_run_result()

in src/translation/dags/dml_validation_dag.py [0:0]


def _save_dry_run_result(ti, **kwargs):
    logging.info("In save dry run results")
    results = ti.xcom_pull(task_ids="dry_run", key="dry_run_results")
    logging.info(f"Dry run execution results {results is None}")
    if results is None:
        logging.error(
            "None of the SQL file processed for dml validation\
                hence nothing to log"
        )
    else:
        try:
            bigquery_execute_multi_query = BigQueryInsertJobOperator(
                task_id="store_dry_run_result",
                dag=dag,
                configuration={
                    "query": {"query": results["queryString"], "useLegacySql": False}
                },
            )
            bigquery_execute_multi_query.execute(context=kwargs)

        except Exception as e:
            logging.info("Something went wrong in task _save_dry_run_result")
            logging.info(f"Exception is :: {str(e)}")
        else:
            logging.info("Dry run result stored successfully")

        aggregated_results = results["aggregated_results"]
        if aggregated_results == []:
            logging.info("DML Aggregation Stats are empty.")
        else:
            aggregated_insert = bq_client.insert_rows_json(
                DML_VALIDATION_AGGREGATED_RESULTS_TABLE_ID, aggregated_results
            )

        logging.info(
            f"Dry run aggregated result stored successfully\
                :: {aggregated_insert}"
        )

    logging.info("Save dry run result execution completed")