in managed-connectivity/sample-custom-connector/src/entry_builder.py [0:0]
def build_dataset(config, df_raw, db_schema, entry_type):
"""Build table entries from a flat list of columns.
Args:
df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
and NULLABLE columns
db_schema - parent database schema
entry_type - entry type: table or view
Returns:
A dataframe with Dataplex-readable data of tables of views.
"""
schema_key = "dataplex-types.global.schema"
# The transformation below does the following
# 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
# 2. Renames NULLABLE to mode
# 3. Renames DATA_TYPE to dataType
# 4. Creates metadataType column based on dataType column
# 5. Renames COLUMN_NAME to name
df = df_raw \
.withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
.drop("NULLABLE") \
.withColumnRenamed("DATA_TYPE", "dataType") \
.withColumn("metadataType", choose_metadata_type_udf("dataType")) \
.withColumnRenamed("COLUMN_NAME", "name")
# The transformation below aggregate fields, denormalizing the table
# TABLE_NAME becomes top-level filed, and the rest is put into
# the array type called "fields"
aspect_columns = ["name", "mode", "dataType", "metadataType"]
df = df.withColumn("columns", F.struct(aspect_columns))\
.groupby('TABLE_NAME') \
.agg(F.collect_list("columns").alias("fields"))
# Create nested structured called aspects.
# Fields are becoming a part of a `schema` struct
# There is also an entry_aspect that is repeats entry_type as aspect_type
entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
df = df.withColumn("schema",
F.create_map(F.lit(schema_key),
F.named_struct(
F.lit("aspect_type"),
F.lit(schema_key),
F.lit("data"),
F.create_map(F.lit("fields"),
F.col("fields")))
)
)\
.withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
.drop("fields")
# Merge separate aspect columns into the one map called 'aspects'
df = df.select(F.col("TABLE_NAME"),
F.map_concat("schema", "entry_aspect").alias("aspects"))
# Define user-defined functions to fill the general information
# and hierarchy names
create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
db_schema, x),
StringType())
create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
db_schema, x), StringType())
parent_name = nb.create_parent_name(entry_type, db_schema)
full_entry_type = entry_type.value.format(
project=config["target_project_id"],
location=config["target_location_id"])
# Fill the top-level fields
column = F.col("TABLE_NAME")
df = df.withColumn("name", create_name_udf(column)) \
.withColumn("fully_qualified_name", create_fqn_udf(column)) \
.withColumn("entry_type", F.lit(full_entry_type)) \
.withColumn("parent_entry", F.lit(parent_name)) \
.withColumn("entry_source", create_entry_source(column)) \
.drop(column)
df = convert_to_import_items(df, [schema_key, entry_aspect_name])
return df