in src/common/materializer/generate_dependent_dags.py [0:0]
def main(args: Sequence[str]):
parser = argparse.ArgumentParser(description="Generate task dependent DAG")
parser.add_argument("--module_name",
type=str,
required=True,
help="Name of module e.g. 'cm360', 'sap'")
parser.add_argument("--target_dataset_type",
type=str,
required=True,
help="Type of dataset, must be 'reporting'")
parser.add_argument("--config_file",
type=str,
required=True,
help="Relative path to file containing deployment "
"configurations.")
parser.add_argument("--materializer_settings_file",
type=str,
required=True,
help="File path to materializer settings")
parser.add_argument("--debug",
action="store_true",
default=False,
required=False)
params = parser.parse_args(args)
logging.basicConfig(format="%(asctime)s | %(levelname)s | %(message)s",
level=logging.DEBUG if params.debug else logging.INFO)
config_dict = configs.load_config_file(params.config_file)
td_settings = generate_assets.get_enabled_task_dep_settings_file(
pathlib.Path(params.materializer_settings_file), config_dict)
if td_settings:
materializer_settings_file = str(td_settings)
logging.info(
"Task dependencies are enabled and %s exists. Using "
"task dependent settings.", td_settings.name)
else:
logging.info(
"Task dependencies aren't enabled or task dependent version of "
"%s doesn't exist. ", params.materializer_settings_file)
return
logging.info("Generating DAG files with task dependencies.")
materializer_settings = generate_assets.get_materializer_settings(
materializer_settings_file)
task_dep_objs = dependent_dags.get_task_deps(materializer_settings)
if not task_dep_objs:
logging.info("No task dependent DAGs were found.")
return
# Parse configs for relevant options.
allow_telemetry = config_dict.get("allowTelemetry", True)
lower_module_name = params.module_name.lower()
lower_tgt_dataset_type = params.target_dataset_type.lower().replace(
" ", "_")
location = config_dict["location"]
# Note that the generated DAG files are under a lowercase module_name dir
# whereas the generated build files are not. This is to match the current
# implementation for non-task dependent DAGs.
generated_dag_files_dir = (generate_assets.GENERATED_DAG_DIR_NAME /
lower_module_name / lower_tgt_dataset_type /
"task_dep_dags")
generated_build_files_dir = (generate_assets.GENERATED_BUILD_DIR_NAME /
params.module_name)
jinja_data_file = (generate_assets.GENERATED_BUILD_DIR_NAME /
params.module_name /
generate_assets.JINJA_DATA_FILE_NAME)
# Marketing modules are nested inconsistently compared to other sources.
if params.module_name in constants.MARKETING_MODULES:
tgt_dataset = config_dict["marketing"][
params.module_name]["datasets"][lower_tgt_dataset_type]
else:
tgt_dataset = config_dict[
params.module_name]["datasets"][lower_tgt_dataset_type]
tgt_dataset_full_name = config_dict["projectIdTarget"] + "." + tgt_dataset
if not (generated_build_files_dir.exists() and jinja_data_file.exists()):
raise RuntimeError(
"Generated build files and jinja data must already exist.")
dep_dag_generator = dependent_dags.DependentDagGenerator(
lower_module_name, tgt_dataset_full_name, lower_tgt_dataset_type,
allow_telemetry, location, generated_dag_files_dir, jinja_data_file)
_ = dep_dag_generator.create_dep_dags(task_dep_objs)
logging.info("Completed generating DAG files with task dependencies.")