in src/common/py_libs/k9_deployer.py [0:0]
def deploy_k9(k9_manifest: dict,
k9_root_path: Path,
config_file: str,
logs_bucket: str,
data_source: str = "k9",
dataset_type: str = "processing",
extra_settings: dict = None,
worker_pool_name: str = None,
region: str = None,
build_account: str = None) -> None:
"""Deploy a single K9 given manifest, settings, and parameters.
Args:
k9_manifest (dict): Dict containing manifest for the current K9.
k9_root_path (str): Rel path to the current K9.
config_file (str): Rel path to config.json file.
logs_bucket (str): Log bucket name.
data_source (str): Data source in config.json. Case sensitive.
Example: "sap", "marketing.CM360", "k9".
dataset_type (str): Dataset type. Case sensitive.
Example: "cdc", "reporting", "processing".
extra_settings (dict, optional): Ext settings that may apply to the k9.
worker_pool_name (str, optional): Name of Cloud Build private worker
pool to use.
region (str, optional): Cloud Build region to use in deployment.
build_account (str, optional): Service account to execute
Cloud Build deployment as.
"""
config = configs.load_config_file(config_file)
source_project = config["projectIdSource"]
target_project = config["projectIdTarget"]
deploy_test_data = config["testData"]
location = config["location"].lower()
target_bucket = config["targetBucket"]
test_harness_version = config.get("testHarnessVersion",
constants.TEST_HARNESS_VERSION)
allow_telemetry = config.get("allowTelemetry", True)
bq_client = cortex_bq_client.CortexBQClient(source_project,
location=location)
k9_id = k9_manifest["id"]
logging.info("🐕 Deploying k9 %s 🐕", k9_id)
if deploy_test_data and "test_data" in k9_manifest:
logging.info("Loading test data for k9 %s", k9_id)
#TODO Switch to YAML array instead of comma-delimited string.
tables = [t.strip() for t in k9_manifest["test_data"].split(",")]
target_type = k9_manifest.get("test_data_target",
f"{data_source}.datasets.{dataset_type}")
components = target_type.split(".")
project = (target_project
if components[-1] == "reporting" else source_project)
# Starting with the whole config,
# then iterating over dictionaries tree.
ds_cfg = config
for comp in components:
ds_cfg = ds_cfg[comp]
target_dataset = ds_cfg
test_data_dataset = test_harness.get_test_harness_dataset(
data_source, dataset_type, location, test_harness_version)
sources = [
f"{config['testDataProject']}.{test_data_dataset}.{table}"
for table in tables
]
targets = [f"{project}.{target_dataset}.{table}" for table in tables]
bq_helper.load_tables(bq_client,
sources,
targets,
location=location,
skip_existing_tables=True)
k9_path = k9_root_path.joinpath(k9_manifest["path"])
if "entry_point" in k9_manifest:
# Call a custom entry point.
entry_point = k9_path.joinpath(k9_manifest["entry_point"])
ext = entry_point.suffix.lower()
exec_params = [str(entry_point), str(config_file), logs_bucket]
if "build_params" in k9_manifest:
exec_params = [
str(entry_point), "--config-file",
str(config_file), "--gcs-logs-bucket", logs_bucket
]
if region:
exec_params.extend(["--region", region])
if worker_pool_name:
exec_params.extend(["--worker-pool-name", worker_pool_name])
if build_account:
exec_params.extend(["--build-account", build_account])
# Add JSON-based extra settings to command line if they exist.
if extra_settings:
exec_params.append(json.dumps(extra_settings))
if ext == ".py":
exec_params.insert(0, "python3")
elif ext == ".sh":
exec_params.insert(0, "bash")
logging.info("Executing k9 `%s` deployer: %s.", k9_id, entry_point)
subprocess.check_call(exec_params)
else:
# Perform simple deployment.
jinja_dict = jinja.initialize_jinja_from_config(config)
# Add placeholder for proper jinja substitution if telemetry opted out
jinja_dict["runtime_labels_dict"] = ""
# Add BQ location to jina substution dict
jinja_dict["location"] = location
if allow_telemetry:
# Converts Cortex Job Label dict to string for jinja substitution
jinja_dict["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
_simple_process_and_upload(k9_id, str(k9_path), jinja_dict,
target_bucket, bq_client, data_source,
dataset_type)
# The following applies only if Reporting is specified from K9 standpoint,
# i.e. during K9 pre or K9 post deployments.
# For K9 invoked from Materializer, "reporting_settings" section is
# ignored if it exists.
if data_source == "k9" and "reporting_settings" in k9_manifest:
reporting_settings_file = k9_path.joinpath(
k9_manifest["reporting_settings"])
logging.info("k9 `%s` has Reporting views.", k9_id)
logging.info("Executing Materializer for `%s` with `%s`.", k9_id,
reporting_settings_file)
exec_params = [
"./src/common/materializer/deploy.sh", "--gcs_logs_bucket",
logs_bucket, "--gcs_tgt_bucket", target_bucket, "--config_file",
str(config_file), "--materializer_settings_file",
str(reporting_settings_file), "--target_type", "Reporting",
"--module_name", "k9"
]
if region:
exec_params.extend(["--region", region])
if worker_pool_name:
exec_params.extend(["--worker_pool_name", worker_pool_name])
if build_account:
exec_params.extend(["--build_account", build_account])
subprocess.check_call(exec_params)
logging.info("🐕 k9 `%s` has been deployed! 🐕", k9_id)