def _execute_queries()

in src/translation/dags/schema_dag.py [0:0]


def _execute_queries(ti, dag_run):
    config = dag_run.conf["config"]
    files = dag_run.conf["files"]

    local_files = [os.path.join(LOCAL_DATA_DIRECTORY, file) for file in files]
    unique_id = config[CUSTOM_RUN_ID_KEY]

    # Outcome handlers
    def success_handler(script):
        logging.info(f"file {script.filename} ran ok")

    def error_handler(script, exception):
        if exception.code in {404, 416} or exception.code in range(500, 600):
            script.mark_for_retry()
            action = "marked for retry"
        elif exception.code in {409}:
            script.mark_as_done()
            action = "marked as done"
        else:
            action = "marked as error"

        logging.error(
            f'error running {script.filename}, exception: {exception}.{" " + action if action else ""}'
        )

    # Execution
    start_time = datetime.datetime.utcnow()

    ran_scripts = ddl_utils.run_script_files(
        file_list=local_files,
        error_handler=error_handler,
        success_handler=success_handler,
        job_id_prefix=f"{unique_id}-",
    )

    # Metric collection
    def script_to_metric(script):
        return {
            "unique_id": config[CUSTOM_RUN_ID_KEY],
            "sql_file_name": script.filename[len(LOCAL_DATA_DIRECTORY) + 1 :],
            "job_id": script.get_job().job_id,
            "status": "success" if script.done() else "fail",
            "error_details": (
                str(script.get_job().exception()) if script.failed() else ""
            ),
            "execution_start_time": str(start_time),
            "gcs_source_path": config["migrationTask"]["translationConfigDetails"][
                "gcsSourcePath"
            ],
        }

    results = list(map(script_to_metric, ran_scripts))
    ti.xcom_push(key="results", value=results)

    aggregated_results = [
        {
            "unique_id": config[CUSTOM_RUN_ID_KEY],
            "total_files": len(ran_scripts),
            "successful_files": sum(map(ddl_utils.Script.done, ran_scripts)),
            "failed_files": sum(map(ddl_utils.Script.failed, ran_scripts)),
        }
    ]
    ti.xcom_push(key="aggregated_results", value=aggregated_results)