in src/translation/dags/translation_utils/ddl_extraction_utils/build_hive_ddl_extraction_group.py [0:0]
def build_hive_ddl_extraction_group(dag: DAG) -> TaskGroup:
hive_extraction_taskgroup = TaskGroup(group_id="hive_extraction_taskgroup")
set_required_variables = PythonOperator(
task_id="set_required_vars",
python_callable=_set_required_vars,
task_group=hive_extraction_taskgroup,
dag=dag,
)
copy_global_config_file = GCSToGCSOperator(
task_id="copy_global_config_file",
source_bucket=Variable.get("hive_config_bucket_id", default_var=" "),
source_object="scripts/translation/hive/global_typeconvert.config.yaml",
destination_bucket=Variable.get("input_ddl_bucket", default_var=""),
destination_object=Variable.get("input_ddl_path", default_var=""),
task_group=hive_extraction_taskgroup,
dag=dag,
)
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=_next_task,
task_group=hive_extraction_taskgroup,
dag=dag,
)
load_hive_ddl_metadata = GCSToBigQueryOperator(
task_id="load_hive_ddl_metadata",
bucket="{{ ti.xcom_pull(key='gcs_temp_bucket')}}",
source_objects="files/{{ti.xcom_pull(key='hive_db')}}.csv",
destination_project_dataset_table="{{ti.xcom_pull(key='hive_ddl_tbl')}}",
source_format="CSV",
field_delimiter="\t",
skip_leading_rows=1,
write_disposition="WRITE_APPEND",
schema_fields=[
{"name": "run_id", "type": "INTEGER"},
{"name": "start_time", "type": "TIMESTAMP"},
{"name": "database", "type": "STRING"},
{"name": "table", "type": "STRING"},
{"name": "field_delimiter", "type": "STRING"},
{"name": "partition_flag", "type": "STRING"},
{"name": "cluster_flag", "type": "STRING"},
{"name": "format", "type": "STRING"},
{"name": "ddl_extracted", "type": "STRING"},
],
task_group=hive_extraction_taskgroup,
dag=dag,
)
extract_hive_ddls = DataprocCreateBatchOperator(
task_id="extract_hive_ddls",
region=Variable.get("region", default_var=0),
batch_id=batch_id,
# Variable.get("temp_gcs_bucket_id",default_var=0)
batch={
"pyspark_batch": {
"main_python_file_uri": "gs://"
+ Variable.get("hive_config_bucket_id", default_var=" ")
+ "/scripts/translation/hive/extract_hive_ddls.py",
"args": [
"--build_config",
"{{ ti.xcom_pull(key='next_dag_data', task_ids='hive_extraction_taskgroup.set_required_vars') }}",
],
}
},
task_group=hive_extraction_taskgroup,
dag=dag,
)
invoke_batch_translator_dag = TriggerDagRunOperator(
task_id="invoke_batch_translator_dag_task",
trigger_dag_id=BATCH_TRANSLATOR_DAG_ID,
conf={
"config": "{{ ti.xcom_pull(key='next_dag_data', task_ids='hive_extraction_taskgroup.set_required_vars') }}"
},
trigger_rule="one_success",
task_group=hive_extraction_taskgroup,
dag=dag,
)
(
set_required_variables
>> copy_global_config_file
>> branch_task
>> [load_hive_ddl_metadata, extract_hive_ddls]
>> invoke_batch_translator_dag
)
return hive_extraction_taskgroup