in bundle_adb_360/src/tools/azure-diagnostic-logs-etl-unity-catalog.py [0:0]
def flatten_table(service):
service_name = service.replace("-", "_")
flattenedStream = spark.readStream.table(silver_table)
flattened = spark.table(silver_table)
schema = StructType()
keys = (
flattened
.filter(col("serviceName") == service_name)
.select(just_keys_udf(col("flattened")).alias("keys"))
.distinct()
.collect()
)
keysList = [i.asDict()['keys'][1:-1].split(", ") for i in keys]
keysDistinct = {key for keys in keysList for key in keys if key != ""}
if len(keysDistinct) == 0:
schema.add(StructField('placeholder', StringType()))
else:
for key in keysDistinct:
schema.add(StructField(key, StringType()))
# write the df with the correct schema to table
(flattenedStream
.filter(col("serviceName") == service_name)
.withColumn("requestParams", from_json(col("flattened"), schema))
.drop("flattened")
.writeStream
.partitionBy("date")
.outputMode("append")
.format("delta")
.option("checkpointLocation", f"{checkpoint_path}/gold/{service_name}")
.option("mergeSchema", True)
.trigger(availableNow=True)
.toTable(f"{catalog}.{database}.{service_name}")
)
# optimize the table as well
spark.sql(f"OPTIMIZE {catalog}.{database}.{service_name}")