in src/SAP/SAP_CDC/common/data_mesh/deploy_data_mesh.py [0:0]
def main(args: Sequence[str]) -> int:
parser = argparse.ArgumentParser(description="Cortex Data Mesh Deployer")
parser.add_argument("--config-file",
type=str,
required=True,
help="Path to Cortex config.json file.")
parser.add_argument("--tag-template-directories",
nargs="*",
default=[],
required=False,
help="A space delimited list of directories to find "
"CatalogTagTemplates specs to deploy.")
parser.add_argument("--policy-directories",
nargs="*",
default=[],
required=False,
help="A space delimited list of directories to find "
"PolicyTaxonomies specs to deploy.")
parser.add_argument("--lake-directories",
nargs="*",
default=[],
required=False,
help="A space delimited list of directories to find "
"Lakes specs to deploy.")
parser.add_argument("--annotation-directories",
nargs="*",
default=[],
required=False,
help="A space delimited list of directories to find "
"BqAssetAnnotation specs to deploy.")
# TODO: Consider if skip flags are still useful despite deployment opts.
parser.add_argument("--skip-resources",
action="store_true",
default=False,
required=False,
help="Skip deployment of metadata resources. "
"Metadata resources should already exist if deploying "
"annotations. Tag template directories used in "
"annotations must still be provided, despite not being "
"re-created")
parser.add_argument("--skip-annotations",
action="store_true",
default=False,
required=False,
help="Skip deployment of asset annotations.")
parser.add_argument("--overwrite",
action="store_true",
default=False,
required=False,
help="Overwrite conflicting existing resources.")
parser.add_argument("--debug",
action="store_true",
default=False,
required=False)
params = parser.parse_args(args)
logging.basicConfig(format="%(asctime)s | %(levelname)s | %(message)s",
level=logging.DEBUG if params.debug else logging.INFO)
config_file = str(pathlib.Path(params.config_file).absolute())
config = configs.load_config_file(config_file)
jinja_dict = jinja.initialize_jinja_from_config(config)
deployment_options_config = config.get("DataMesh")
if not deployment_options_config:
raise cortex_exc.CriticalError(
"DataMesh deployment options not specified in config.")
deployment_options = DeploymentOptions.from_dict(deployment_options_config)
logging.info("🔷️ Deploying Data Mesh with the following options: %s",
deployment_options)
client = data_mesh_client.CortexDataMeshClient(config["location"],
params.overwrite)
# These resources are parsed regardless of the deployment options in case
# they are referenced by other specs.
tag_templates = []
for directory in params.tag_template_directories:
tag_templates.extend(
_specs_from_directory(directory, dmt.CatalogTagTemplates,
jinja_dict))
policy_taxonomies = []
for directory in params.policy_directories:
policy_taxonomies.extend(
_specs_from_directory(directory, dmt.PolicyTaxonomies, jinja_dict))
lakes = []
for directory in params.lake_directories:
lakes.extend(_specs_from_directory(directory, dmt.Lakes, jinja_dict))
if not params.skip_resources:
deployed_resources = {}
if deployment_options.deployCatalog:
deployed_resources["tag_templates_specs"] = tag_templates
if deployment_options.deployACLs:
deployed_resources["policy_taxonomies_specs"] = policy_taxonomies
if deployment_options.deployLakes:
deployed_resources["lakes_specs"] = lakes
asyncio.run(
client.create_metadata_resources_async(**deployed_resources))
if not params.skip_annotations:
for directory in params.annotation_directories:
annotations = _specs_from_directory(directory,
dmt.BqAssetAnnotation,
jinja_dict)
asyncio.run(
client.annotate_bq_assets_async(
annotations,
tag_templates,
deploy_descriptions=deployment_options.deployDescriptions,
deploy_catalog=deployment_options.deployCatalog,
deploy_acls=deployment_options.deployACLs))
logging.info("Data mesh successfully deployed!")
return 0