in mozetl/landfill/sampler.py [0:0]
def transform(landfill, n_documents=1000):
meta_schema = StructType(
[StructField(k, StringType(), True) for k in META_WHITELIST]
)
schema = StructType(
[
StructField("namespace", StringType(), False),
StructField("doc_type", StringType(), False),
StructField("doc_version", StringType(), True),
StructField("doc_id", StringType(), True),
StructField("meta", meta_schema, False),
StructField("content", StringType(), False),
]
)
documents = (
landfill.map(_process)
.filter(lambda x: x[0] and x[1] and x[-2] and x[-1])
.toDF(schema)
)
window_spec = Window.partitionBy("namespace", "doc_type", "doc_version").orderBy(
"doc_id"
)
df = (
documents.fillna("0", "doc_version")
.withColumn("row_id", row_number().over(window_spec))
.where(col("row_id") <= n_documents)
.drop("row_id")
)
return df