in managed-connectivity/sample-custom-connector/src/name_builder.py [0:0]
def create_name(config: Dict[str, str], entry_type: EntryType,
schema_name: str = "", table_name: str = ""):
"""Creates a Dataplex v2 hierarchy name."""
if FORBIDDEN_SYMBOL in schema_name:
schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
if entry_type == EntryType.INSTANCE:
name_prefix = (
f"projects/{config['target_project_id']}/"
f"locations/{config['target_location_id']}/"
f"entryGroups/{config['target_entry_group_id']}/"
f"entries/"
)
return name_prefix + config["host_port"].replace(":", "@")
if entry_type == EntryType.DATABASE:
instance = create_name(config, EntryType.INSTANCE)
return f"{instance}/databases/{config['database']}"
if entry_type == EntryType.DB_SCHEMA:
database = create_name(config, EntryType.DATABASE)
return f"{database}/database_schemas/{schema_name}"
if entry_type == EntryType.TABLE:
db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
return f"{db_schema}/tables/{table_name}"
if entry_type == EntryType.VIEW:
db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
return f"{db_schema}/views/{table_name}"
return ""