in mozetl/taar/taar_lite_guidguid.py [0:0]
def transform(longitudinal_addons):
# Only for logging, not used, but may be interesting for later analysis.
guid_set_unique = (
longitudinal_addons.withColumn(
"exploded", F.explode(longitudinal_addons.installed_addons)
)
.select("exploded") # noqa: E501 - long lines
.rdd.flatMap(lambda x: x)
.distinct()
.collect()
)
logging.info(
"Number of unique guids co-installed in sample: " + str(len(guid_set_unique))
)
restructured = longitudinal_addons.rdd.flatMap(
lambda x: key_all(x.installed_addons)
).toDF(["key_addon", "coinstalled_addons"])
# Explode the list of co-installs and count pair occurrences.
addon_co_installations = (
restructured.select(
"key_addon", F.explode("coinstalled_addons").alias("coinstalled_addon")
) # noqa: E501 - long lines
.groupBy("key_addon", "coinstalled_addon")
.count()
)
# Collect the set of coinstalled_addon, count pairs for each key_addon.
combine_and_map_cols = F.udf(
lambda x, y: (x, y),
StructType([StructField("id", StringType()), StructField("n", LongType())]),
)
# Spark functions are sometimes long and unwieldy. Tough luck.
# Ignore E128 and E501 long line errors
addon_co_installations_collapsed = (
addon_co_installations.select( # noqa: E128
"key_addon",
combine_and_map_cols("coinstalled_addon", "count").alias( # noqa: E501
"id_n"
),
)
.groupby("key_addon")
.agg(F.collect_list("id_n").alias("coinstallation_counts"))
)
logging.info(addon_co_installations_collapsed.printSchema())
logging.info("Collecting final result of co-installations.")
return addon_co_installations_collapsed