in src/translation/dags/schema_dag.py [0:0]
def _execute_queries(ti, dag_run):
config = dag_run.conf["config"]
files = dag_run.conf["files"]
local_files = [os.path.join(LOCAL_DATA_DIRECTORY, file) for file in files]
unique_id = config[CUSTOM_RUN_ID_KEY]
# Outcome handlers
def success_handler(script):
logging.info(f"file {script.filename} ran ok")
def error_handler(script, exception):
if exception.code in {404, 416} or exception.code in range(500, 600):
script.mark_for_retry()
action = "marked for retry"
elif exception.code in {409}:
script.mark_as_done()
action = "marked as done"
else:
action = "marked as error"
logging.error(
f'error running {script.filename}, exception: {exception}.{" " + action if action else ""}'
)
# Execution
start_time = datetime.datetime.utcnow()
ran_scripts = ddl_utils.run_script_files(
file_list=local_files,
error_handler=error_handler,
success_handler=success_handler,
job_id_prefix=f"{unique_id}-",
)
# Metric collection
def script_to_metric(script):
return {
"unique_id": config[CUSTOM_RUN_ID_KEY],
"sql_file_name": script.filename[len(LOCAL_DATA_DIRECTORY) + 1 :],
"job_id": script.get_job().job_id,
"status": "success" if script.done() else "fail",
"error_details": (
str(script.get_job().exception()) if script.failed() else ""
),
"execution_start_time": str(start_time),
"gcs_source_path": config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
],
}
results = list(map(script_to_metric, ran_scripts))
ti.xcom_push(key="results", value=results)
aggregated_results = [
{
"unique_id": config[CUSTOM_RUN_ID_KEY],
"total_files": len(ran_scripts),
"successful_files": sum(map(ddl_utils.Script.done, ran_scripts)),
"failed_files": sum(map(ddl_utils.Script.failed, ran_scripts)),
}
]
ti.xcom_push(key="aggregated_results", value=aggregated_results)