in src/translation/dags/translation_utils/ddl_extraction_utils/build_teradata_ddl_extraction_group.py [0:0]
def build_teradata_ddl_extraction_group(dag: DAG) -> TaskGroup:
teradata_extraction_taskgroup = TaskGroup(group_id="teradata_extraction_taskgroup")
check_teradata_jdbc_jar_present = PythonOperator(
task_id="check_teradata_jdbc_jar_present",
python_callable=_check_teradata_jdbc_jar_present,
task_group=teradata_extraction_taskgroup,
dag=dag,
)
download_script_files = PythonOperator(
task_id="download_script_files",
python_callable=_download_files,
task_group=teradata_extraction_taskgroup,
dag=dag,
)
prepare_arguments = PythonOperator(
task_id="prepare_arguments",
python_callable=_prepare_arguments,
task_group=teradata_extraction_taskgroup,
dag=dag,
)
run_dwh_tool = BashOperator(
task_id="run_dwh_tool",
bash_command=f"bash {BASH_FILE_LOCAL_PATH} $arg_string",
# bash_command="pwd",
env={
"arg_string": "{{ ti.xcom_pull(task_ids='teradata_extraction_taskgroup.prepare_arguments', key='arg_string') }}"
},
task_group=teradata_extraction_taskgroup,
dag=dag,
)
store_ddl = PythonOperator(
task_id="store_ddl",
python_callable=_store_ddl,
task_group=teradata_extraction_taskgroup,
dag=dag,
)
invoke_translation_dag = TriggerDagRunOperator(
task_id="invoke_batch_sql_translation",
trigger_dag_id="batch_sql_translation",
conf={
"config": "{{ ti.xcom_pull(task_ids='teradata_extraction_taskgroup.prepare_arguments', key='config') }}",
},
task_group=teradata_extraction_taskgroup,
dag=dag,
)
remove_metadata_folder = PythonOperator(
task_id="remove_metadata_folder",
python_callable=_remove_metadata_folder,
task_group=teradata_extraction_taskgroup,
dag=dag,
)
(
check_teradata_jdbc_jar_present
>> download_script_files
>> prepare_arguments
>> run_dwh_tool
>> store_ddl
>> [remove_metadata_folder, invoke_translation_dag]
)
return teradata_extraction_taskgroup