in src/translation/dags/translation_utils/ddl_extraction_utils/build_hive_ddl_extraction_group.py [0:0]
def _set_required_vars(ti, **kwargs):
"""
TODO
get bucket id from kwargs and then push as env variable
Check parallelization_dag for reference
"""
# Read the entire config as a response
translation_config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
ti.xcom_push(
key="next_dag_data", value=json.dumps(kwargs["dag_run"].conf["config"])
)
bq_dataset_audit = "dmt_logs"
region = translation_config["hive_config"]["transfer-configuration"]["region"]
Variable.set("region", region)
Variable.set(
"input_ddl_bucket",
translation_config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
.split("//")[1]
.split("/")[0],
)
Variable.set(
"input_ddl_path",
translation_config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
.split("//")[1]
.split("/", 1)[1]
+ "/global_typeconvert.config.yaml",
)
ti.xcom_push(key="gcs_temp_bucket", value=translation_config["gcs_temp_bucket"])
nm_map_list = translation_config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"]
hive_db = list(set(d["source"]["schema"] for d in nm_map_list))[0]
ti.xcom_push(key="hive_db", value=hive_db)
hive_ddl_tbl = "{proj_id}.{dataset}.{tbl}".format(
proj_id=translation_config["hive_config"]["transfer-configuration"][
"project_id"
],
dataset=bq_dataset_audit,
tbl="hive_ddl_metadata",
)
print("hive_ddl_tbl: " + hive_ddl_tbl)
ti.xcom_push(key="hive_ddl_tbl", value=hive_ddl_tbl)