def transform()

in mozetl/taar/taar_lite_guidguid.py [0:0]


def transform(longitudinal_addons):
    # Only for logging, not used, but may be interesting for later analysis.
    guid_set_unique = (
        longitudinal_addons.withColumn(
            "exploded", F.explode(longitudinal_addons.installed_addons)
        )
        .select("exploded")  # noqa: E501 - long lines
        .rdd.flatMap(lambda x: x)
        .distinct()
        .collect()
    )
    logging.info(
        "Number of unique guids co-installed in sample: " + str(len(guid_set_unique))
    )

    restructured = longitudinal_addons.rdd.flatMap(
        lambda x: key_all(x.installed_addons)
    ).toDF(["key_addon", "coinstalled_addons"])

    # Explode the list of co-installs and count pair occurrences.
    addon_co_installations = (
        restructured.select(
            "key_addon", F.explode("coinstalled_addons").alias("coinstalled_addon")
        )  # noqa: E501 - long lines
        .groupBy("key_addon", "coinstalled_addon")
        .count()
    )

    # Collect the set of coinstalled_addon, count pairs for each key_addon.
    combine_and_map_cols = F.udf(
        lambda x, y: (x, y),
        StructType([StructField("id", StringType()), StructField("n", LongType())]),
    )

    # Spark functions are sometimes long and unwieldy. Tough luck.
    # Ignore E128 and E501 long line errors
    addon_co_installations_collapsed = (
        addon_co_installations.select(  # noqa: E128
            "key_addon",
            combine_and_map_cols("coinstalled_addon", "count").alias(  # noqa: E501
                "id_n"
            ),
        )
        .groupby("key_addon")
        .agg(F.collect_list("id_n").alias("coinstallation_counts"))
    )
    logging.info(addon_co_installations_collapsed.printSchema())
    logging.info("Collecting final result of co-installations.")

    return addon_co_installations_collapsed