in src/translation/dags/dml_validation_dag.py [0:0]
def _save_dry_run_result(ti, **kwargs):
logging.info("In save dry run results")
results = ti.xcom_pull(task_ids="dry_run", key="dry_run_results")
logging.info(f"Dry run execution results {results is None}")
if results is None:
logging.error(
"None of the SQL file processed for dml validation\
hence nothing to log"
)
else:
try:
bigquery_execute_multi_query = BigQueryInsertJobOperator(
task_id="store_dry_run_result",
dag=dag,
configuration={
"query": {"query": results["queryString"], "useLegacySql": False}
},
)
bigquery_execute_multi_query.execute(context=kwargs)
except Exception as e:
logging.info("Something went wrong in task _save_dry_run_result")
logging.info(f"Exception is :: {str(e)}")
else:
logging.info("Dry run result stored successfully")
aggregated_results = results["aggregated_results"]
if aggregated_results == []:
logging.info("DML Aggregation Stats are empty.")
else:
aggregated_insert = bq_client.insert_rows_json(
DML_VALIDATION_AGGREGATED_RESULTS_TABLE_ID, aggregated_results
)
logging.info(
f"Dry run aggregated result stored successfully\
:: {aggregated_insert}"
)
logging.info("Save dry run result execution completed")