in src/translation/dags/schema_dag.py [0:0]
def _create_dataset(**kwargs):
if kwargs["dag_run"].conf is not None and "config" in kwargs["dag_run"].conf:
logging.info("Configuration file is not empty")
logging.info(kwargs["dag_run"].conf["config"])
config = kwargs["dag_run"].conf["config"]
nm_map_list = config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"]
uq_dataset = set(d["target"]["schema"] for d in nm_map_list)
for dataset in uq_dataset:
execute_create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id=f"execute_create_dataset_{dataset}", dag=dag, dataset_id=dataset
)
execute_create_dataset.execute(context=kwargs)
else:
logging.error("Configuration file is empty")