def _set_required_vars()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_hive_ddl_extraction_group.py [0:0]


def _set_required_vars(ti, **kwargs):
    """
    TODO
    get bucket id from kwargs and then push as env variable
    Check parallelization_dag for reference
    """

    # Read the entire config as a response
    translation_config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
    ti.xcom_push(
        key="next_dag_data", value=json.dumps(kwargs["dag_run"].conf["config"])
    )
    bq_dataset_audit = "dmt_logs"
    region = translation_config["hive_config"]["transfer-configuration"]["region"]
    Variable.set("region", region)
    Variable.set(
        "input_ddl_bucket",
        translation_config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
        .split("//")[1]
        .split("/")[0],
    )
    Variable.set(
        "input_ddl_path",
        translation_config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
        .split("//")[1]
        .split("/", 1)[1]
        + "/global_typeconvert.config.yaml",
    )
    ti.xcom_push(key="gcs_temp_bucket", value=translation_config["gcs_temp_bucket"])
    nm_map_list = translation_config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"]
    hive_db = list(set(d["source"]["schema"] for d in nm_map_list))[0]
    ti.xcom_push(key="hive_db", value=hive_db)
    hive_ddl_tbl = "{proj_id}.{dataset}.{tbl}".format(
        proj_id=translation_config["hive_config"]["transfer-configuration"][
            "project_id"
        ],
        dataset=bq_dataset_audit,
        tbl="hive_ddl_metadata",
    )
    print("hive_ddl_tbl: " + hive_ddl_tbl)
    ti.xcom_push(key="hive_ddl_tbl", value=hive_ddl_tbl)