def _simple_process_and_upload()

in common/py_libs/k9_deployer.py [0:0]


def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict,
                              target_bucket: str, bq_client: bigquery.Client,
                              data_source = "k9", dataset_type="processing"):
    """Process and upload simple (traditional) K9.

    Recursively processes all files in k9_dir,
    Executes all sql files in alphabetical order,
    Rename .templatesql to .sql but do not execute them during deployment.
    Then uploads all processed files recursively to:
    - K9 pre and post:
        gs://{target_bucket}/dags/{k9_id}
    - Localized K9 (i.e. for a given data source):
        gs://{target_bucket}/dags/{data_source}/{dataset_type}/{k9_id}
    """

    logging.info("Deploying simple k9 `%s`.", k9_id)

    with tempfile.TemporaryDirectory() as tmp_dir:
        tmp_dir_path = Path(tmp_dir)
        k9_files = sorted(Path(k9_dir).rglob("*"))
        for path in k9_files:
            rel_path = path.relative_to(k9_dir)
            if path.is_dir() or str(rel_path).startswith("reporting/"):
                continue
            tgt_path = tmp_dir_path.joinpath(rel_path)
            skip_sql_execution = tgt_path.suffix.lower() == ".templatesql"
            if skip_sql_execution:
                tgt_file_name = rel_path.stem + ".sql"
                logging.info("Renaming SQL template %s to %s without "
                             "executing.", rel_path.name, tgt_file_name)
                tgt_path = tmp_dir_path.joinpath(rel_path.parent, tgt_file_name)
            tgt_dir = tgt_path.parent
            if not tgt_dir.exists():
                tgt_dir.mkdir(parents=True)
            env = Environment(
                loader=FileSystemLoader(str(path.parent.absolute())))
            input_template = env.get_template(path.name)
            output_text = str(input_template.render(jinja_dict))
            with open(str(tgt_path), mode="w", encoding="utf-8") as output:
                output.write(output_text)
            if tgt_path.suffix.lower() == ".sql" and not skip_sql_execution:
                logging.info("Executing %s", str(tgt_path.relative_to(tmp_dir)))
                bq_helper.execute_sql_file(bq_client, str(tgt_path))
        # make sure every DAG folder has __init__.py
        if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]:
            with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f:
                f.writelines([
                    "import os",
                    "import sys",
                    ("sys.path.append("
                     "os.path.dirname(os.path.realpath(__file__)))")
                ])

        if data_source == "k9":
            target_path =  f"gs://{target_bucket}/dags/{k9_id}"
        else:
            # Only use actual data source name. Example: for data source
            # "marketing.CM360" we only want to use "cm360"
            ds_short_name = data_source.split(".")[-1].lower()
            target_path =  (f"gs://{target_bucket}/dags/{ds_short_name}/"
                                 f"{dataset_type}/{k9_id}")
        logging.info("Copying generated files to %s",
                     target_path)
        subprocess.check_call([
            "gcloud", "storage", "cp", "--recursive", f"{tmp_dir}/*",
            f"{target_path}/"
        ])