def build_dataset()

in managed-connectivity/sample-custom-connector/src/entry_builder.py [0:0]


def build_dataset(config, df_raw, db_schema, entry_type):
    """Build table entries from a flat list of columns.
    Args:
        df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                 and NULLABLE columns
        db_schema - parent database schema
        entry_type - entry type: table or view
    Returns:
        A dataframe with Dataplex-readable data of tables of views.
    """
    schema_key = "dataplex-types.global.schema"

    # The transformation below does the following
    # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
    # 2. Renames NULLABLE to mode
    # 3. Renames DATA_TYPE to dataType
    # 4. Creates metadataType column based on dataType column
    # 5. Renames COLUMN_NAME to name
    df = df_raw \
      .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
      .drop("NULLABLE") \
      .withColumnRenamed("DATA_TYPE", "dataType") \
      .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
      .withColumnRenamed("COLUMN_NAME", "name")

    # The transformation below aggregate fields, denormalizing the table
    # TABLE_NAME becomes top-level filed, and the rest is put into
    # the array type called "fields"
    aspect_columns = ["name", "mode", "dataType", "metadataType"]
    df = df.withColumn("columns", F.struct(aspect_columns))\
      .groupby('TABLE_NAME') \
      .agg(F.collect_list("columns").alias("fields"))

    # Create nested structured called aspects.
    # Fields are becoming a part of a `schema` struct
    # There is also an entry_aspect that is repeats entry_type as aspect_type
    entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    df = df.withColumn("schema",
                       F.create_map(F.lit(schema_key),
                                    F.named_struct(
                                        F.lit("aspect_type"),
                                        F.lit(schema_key),
                                        F.lit("data"),
                                        F.create_map(F.lit("fields"),
                                                     F.col("fields")))
                                    )
                       )\
      .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
    .drop("fields")

    # Merge separate aspect columns into the one map called 'aspects'
    df = df.select(F.col("TABLE_NAME"),
                   F.map_concat("schema", "entry_aspect").alias("aspects"))

    # Define user-defined functions to fill the general information
    # and hierarchy names
    create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                     db_schema, x),
                            StringType())

    create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                   db_schema, x), StringType())

    parent_name = nb.create_parent_name(entry_type, db_schema)
    full_entry_type = entry_type.value.format(
        project=config["target_project_id"],
        location=config["target_location_id"])

    # Fill the top-level fields
    column = F.col("TABLE_NAME")
    df = df.withColumn("name", create_name_udf(column)) \
      .withColumn("fully_qualified_name", create_fqn_udf(column)) \
      .withColumn("entry_type", F.lit(full_entry_type)) \
      .withColumn("parent_entry", F.lit(parent_name)) \
      .withColumn("entry_source", create_entry_source(column)) \
    .drop(column)

    df = convert_to_import_items(df, [schema_key, entry_aspect_name])
    return df