def transform()

in mozetl/landfill/sampler.py [0:0]


def transform(landfill, n_documents=1000):
    meta_schema = StructType(
        [StructField(k, StringType(), True) for k in META_WHITELIST]
    )

    schema = StructType(
        [
            StructField("namespace", StringType(), False),
            StructField("doc_type", StringType(), False),
            StructField("doc_version", StringType(), True),
            StructField("doc_id", StringType(), True),
            StructField("meta", meta_schema, False),
            StructField("content", StringType(), False),
        ]
    )

    documents = (
        landfill.map(_process)
        .filter(lambda x: x[0] and x[1] and x[-2] and x[-1])
        .toDF(schema)
    )

    window_spec = Window.partitionBy("namespace", "doc_type", "doc_version").orderBy(
        "doc_id"
    )

    df = (
        documents.fillna("0", "doc_version")
        .withColumn("row_id", row_number().over(window_spec))
        .where(col("row_id") <= n_documents)
        .drop("row_id")
    )

    return df