in managed-connectivity/sample-custom-connector/src/entry_builder.py [0:0]
def build_schemas(config, df_raw_schemas):
"""Create a dataframe with database schemas from the list of usernames.
Args:
df_raw_schemas - a dataframe with only one column called USERNAME
Returns:
A dataframe with Dataplex-readable schemas.
"""
entry_type = EntryType.DB_SCHEMA
entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
# For schema, parent name is the name of the database
parent_name = nb.create_parent_name(config, entry_type)
# Create user-defined function.
create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
StringType())
create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
StringType())
# Fills the missed project and location into the entry type string
full_entry_type = entry_type.value.format(
project=config["target_project_id"],
location=config["target_location_id"])
# Converts a list of schema names to the Dataplex-compatible form
column = F.col("USERNAME")
df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
.withColumn("fully_qualified_name", create_fqn_udf(column)) \
.withColumn("parent_entry", F.lit(parent_name)) \
.withColumn("entry_type", F.lit(full_entry_type)) \
.withColumn("entry_source", create_entry_source(column)) \
.withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
.drop(column)
df = convert_to_import_items(df, [entry_aspect_name])
return df