def flatten_table()

in bundle_adb_360/src/tools/azure-diagnostic-logs-etl-unity-catalog.py [0:0]


def flatten_table(service):

    service_name = service.replace("-", "_")

    flattenedStream = spark.readStream.table(silver_table)
    flattened = spark.table(silver_table)

    schema = StructType()

    keys = (
        flattened
        .filter(col("serviceName") == service_name)
        .select(just_keys_udf(col("flattened")).alias("keys"))
        .distinct()
        .collect()
    )

    keysList = [i.asDict()['keys'][1:-1].split(", ") for i in keys]

    keysDistinct = {key for keys in keysList for key in keys if key != ""}

    if len(keysDistinct) == 0:
        schema.add(StructField('placeholder', StringType()))
    else:
        for key in keysDistinct:
            schema.add(StructField(key, StringType()))
    # write the df with the correct schema to table
    (flattenedStream
     .filter(col("serviceName") == service_name)
     .withColumn("requestParams", from_json(col("flattened"), schema))
     .drop("flattened")
     .writeStream
     .partitionBy("date")
     .outputMode("append")
     .format("delta")
     .option("checkpointLocation", f"{checkpoint_path}/gold/{service_name}")
     .option("mergeSchema", True)
     .trigger(availableNow=True)
     .toTable(f"{catalog}.{database}.{service_name}")
     )

    # optimize the table as well
    spark.sql(f"OPTIMIZE {catalog}.{database}.{service_name}")