def build_hive_ddl_extraction_group()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_hive_ddl_extraction_group.py [0:0]


def build_hive_ddl_extraction_group(dag: DAG) -> TaskGroup:
    hive_extraction_taskgroup = TaskGroup(group_id="hive_extraction_taskgroup")

    set_required_variables = PythonOperator(
        task_id="set_required_vars",
        python_callable=_set_required_vars,
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    copy_global_config_file = GCSToGCSOperator(
        task_id="copy_global_config_file",
        source_bucket=Variable.get("hive_config_bucket_id", default_var=" "),
        source_object="scripts/translation/hive/global_typeconvert.config.yaml",
        destination_bucket=Variable.get("input_ddl_bucket", default_var=""),
        destination_object=Variable.get("input_ddl_path", default_var=""),
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    branch_task = BranchPythonOperator(
        task_id="branch_task",
        python_callable=_next_task,
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    load_hive_ddl_metadata = GCSToBigQueryOperator(
        task_id="load_hive_ddl_metadata",
        bucket="{{ ti.xcom_pull(key='gcs_temp_bucket')}}",
        source_objects="files/{{ti.xcom_pull(key='hive_db')}}.csv",
        destination_project_dataset_table="{{ti.xcom_pull(key='hive_ddl_tbl')}}",
        source_format="CSV",
        field_delimiter="\t",
        skip_leading_rows=1,
        write_disposition="WRITE_APPEND",
        schema_fields=[
            {"name": "run_id", "type": "INTEGER"},
            {"name": "start_time", "type": "TIMESTAMP"},
            {"name": "database", "type": "STRING"},
            {"name": "table", "type": "STRING"},
            {"name": "field_delimiter", "type": "STRING"},
            {"name": "partition_flag", "type": "STRING"},
            {"name": "cluster_flag", "type": "STRING"},
            {"name": "format", "type": "STRING"},
            {"name": "ddl_extracted", "type": "STRING"},
        ],
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    extract_hive_ddls = DataprocCreateBatchOperator(
        task_id="extract_hive_ddls",
        region=Variable.get("region", default_var=0),
        batch_id=batch_id,
        # Variable.get("temp_gcs_bucket_id",default_var=0)
        batch={
            "pyspark_batch": {
                "main_python_file_uri": "gs://"
                + Variable.get("hive_config_bucket_id", default_var=" ")
                + "/scripts/translation/hive/extract_hive_ddls.py",
                "args": [
                    "--build_config",
                    "{{ ti.xcom_pull(key='next_dag_data', task_ids='hive_extraction_taskgroup.set_required_vars') }}",
                ],
            }
        },
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    invoke_batch_translator_dag = TriggerDagRunOperator(
        task_id="invoke_batch_translator_dag_task",
        trigger_dag_id=BATCH_TRANSLATOR_DAG_ID,
        conf={
            "config": "{{ ti.xcom_pull(key='next_dag_data', task_ids='hive_extraction_taskgroup.set_required_vars') }}"
        },
        trigger_rule="one_success",
        task_group=hive_extraction_taskgroup,
        dag=dag,
    )

    (
        set_required_variables
        >> copy_global_config_file
        >> branch_task
        >> [load_hive_ddl_metadata, extract_hive_ddls]
        >> invoke_batch_translator_dag
    )

    return hive_extraction_taskgroup