in managed-connectivity/sample-custom-connector/src/name_builder.py [0:0]
def create_fqn(config: Dict[str, str], entry_type: EntryType,
schema_name: str = "", table_name: str = ""):
"""Creates a fully qualified name or Dataplex v1 hierarchy name."""
if FORBIDDEN_SYMBOL in schema_name:
schema_name = f"`{schema_name}`"
if entry_type == EntryType.INSTANCE:
# Requires backticks to escape column
return f"{SOURCE_TYPE}:`{config['host_port']}`"
if entry_type == EntryType.DATABASE:
instance = create_fqn(config, EntryType.INSTANCE)
return f"{instance}.{config['database']}"
if entry_type == EntryType.DB_SCHEMA:
database = create_fqn(config, EntryType.DATABASE)
return f"{database}.{schema_name}"
if entry_type in [EntryType.TABLE, EntryType.VIEW]:
database = create_fqn(config, EntryType.DATABASE)
return f"{database}.{schema_name}.{table_name}"
return ""