in src/common/py_libs/k9_deployer.py [0:0]
def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict,
target_bucket: str, bq_client: bigquery.Client,
data_source = "k9", dataset_type="processing"):
"""Process and upload simple (traditional) K9.
Recursively processes all files in k9_dir,
Executes all sql files in alphabetical order,
Rename .templatesql to .sql but do not execute them during deployment.
Then uploads all processed files recursively to:
- K9 pre and post:
gs://{target_bucket}/dags/{k9_id}
- Localized K9 (i.e. for a given data source):
gs://{target_bucket}/dags/{data_source}/{dataset_type}/{k9_id}
"""
logging.info("Deploying simple k9 `%s`.", k9_id)
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = Path(tmp_dir)
k9_files = sorted(Path(k9_dir).rglob("*"))
for path in k9_files:
rel_path = path.relative_to(k9_dir)
if path.is_dir() or str(rel_path).startswith("reporting/"):
continue
tgt_path = tmp_dir_path.joinpath(rel_path)
skip_sql_execution = tgt_path.suffix.lower() == ".templatesql"
if skip_sql_execution:
tgt_file_name = rel_path.stem + ".sql"
logging.info("Renaming SQL template %s to %s without "
"executing.", rel_path.name, tgt_file_name)
tgt_path = tmp_dir_path.joinpath(rel_path.parent, tgt_file_name)
tgt_dir = tgt_path.parent
if not tgt_dir.exists():
tgt_dir.mkdir(parents=True)
env = Environment(
loader=FileSystemLoader(str(path.parent.absolute())))
input_template = env.get_template(path.name)
output_text = str(input_template.render(jinja_dict))
with open(str(tgt_path), mode="w", encoding="utf-8") as output:
output.write(output_text)
if tgt_path.suffix.lower() == ".sql" and not skip_sql_execution:
logging.info("Executing %s", str(tgt_path.relative_to(tmp_dir)))
bq_helper.execute_sql_file(bq_client, str(tgt_path))
# make sure every DAG folder has __init__.py
if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]:
with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f:
f.writelines([
"import os",
"import sys",
("sys.path.append("
"os.path.dirname(os.path.realpath(__file__)))")
])
if data_source == "k9":
target_path = f"gs://{target_bucket}/dags/{k9_id}"
else:
# Only use actual data source name. Example: for data source
# "marketing.CM360" we only want to use "cm360"
ds_short_name = data_source.split(".")[-1].lower()
target_path = (f"gs://{target_bucket}/dags/{ds_short_name}/"
f"{dataset_type}/{k9_id}")
logging.info("Copying generated files to %s",
target_path)
subprocess.check_call([
"gcloud", "storage", "cp", "--recursive", f"{tmp_dir}/*",
f"{target_path}/"
])