jobs/mozaggregator2bq/notebooks/02-pre_bq_transforms.ipynb (559 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Transforming the aggregates for convenience\n",
"\n",
"We'd like to transform the aggregates into a form that will be useful in BigQuery. We also want to be able to invert the transformation, so we should make sure to save all the relevant information. Adding structure will make it easier to process things.\n",
"\n",
"## Processing dimensions\n",
"\n",
"This notebook relies on the output of `01-convert_to_parquet`."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0------------------------------------------------------------------------------------------\n",
" table_name | submission_date_nightly_43_20191201 \n",
" aggregate_type | submission \n",
" ds_nodash | 20191201 \n",
" dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_INSTANTIA... \n",
" aggregate | {0,2,0,2,2} \n",
"-RECORD 1------------------------------------------------------------------------------------------\n",
" table_name | submission_date_nightly_43_20191201 \n",
" aggregate_type | submission \n",
" ds_nodash | 20191201 \n",
" dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_CONSUMERS... \n",
" aggregate | {0,0,0,0,0,0,0,0,0,0,0,2,22,2} \n",
"-RECORD 2------------------------------------------------------------------------------------------\n",
" table_name | submission_date_nightly_43_20191201 \n",
" aggregate_type | submission \n",
" ds_nodash | 20191201 \n",
" dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_ISIMPLEDO... \n",
" aggregate | {2,0,0,0,2} \n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"input_dir = f\"../data/parquet/submission/20191201\"\n",
"df = spark.read.parquet(input_dir)\n",
"df.show(vertical=True, n=3, truncate=80)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_INSTANTIATED_FLAG\", \"osVersion\": \"10.0\", \"application\": \"Firefox\", \"architecture\": \"x86\"}\n"
]
}
],
"source": [
"print(df.first().dimension)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0----------------------------\n",
" os | Windows_NT \n",
" child | false \n",
" label | \n",
" metric | A11Y_INSTANTIATED... \n",
" osVersion | 10.0 \n",
" application | Firefox \n",
" architecture | x86 \n",
"only showing top 1 row\n",
"\n"
]
}
],
"source": [
"from pyspark.sql import functions as F, types as T\n",
"\n",
"dimension_type = T.StructType([\n",
" T.StructField(\"os\", T.StringType()),\n",
" T.StructField(\"child\", T.StringType()),\n",
" T.StructField(\"label\", T.StringType()),\n",
" T.StructField(\"metric\", T.StringType()),\n",
" T.StructField(\"osVersion\", T.StringType()),\n",
" T.StructField(\"application\", T.StringType()),\n",
" T.StructField(\"architecture\", T.StringType()),\n",
"])\n",
"\n",
"df.select(F.from_json(\"dimension\", dimension_type).alias(\"dimension\")).select(\"dimension.*\").show(vertical=True, n=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Processing histograms\n",
"\n",
"It's useful to note that the last two elements of the aggregate array correspond to `sum` and `count`."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------------------------------------------------------------------+\n",
"| aggregate|\n",
"+--------------------------------------------------------------------------------+\n",
"| {0,2,0,2,2}|\n",
"| {0,0,0,0,0,0,0,0,0,0,0,2,22,2}|\n",
"| {2,0,0,0,2}|\n",
"| {2,0,0,0,2}|\n",
"|{871471,140673,200275,430614,269254,168218,390196,232243,76503,27335,15955,12...|\n",
"|{0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|\n",
"|{43944,2046,143,50,15,9,9,15,38,14,6,2,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...|\n",
"|{0,0,0,0,1,2,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|\n",
"|{0,0,3,2,2,52,44,12,6,3,1,3,2,2,13,61,39,17,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0...|\n",
"| {268,0,0,0,2}|\n",
"| {268,0,0,0,2}|\n",
"|{0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,25,3,24,56,25,31,37,28,13,14,5,0,0...|\n",
"|{0,45771,1,0,341,0,170,2,0,1,0,0,0,1,0,2,0,0,0,2,0,0,0,0,0,0,2,0,0,0,0,0,0,0,...|\n",
"|{0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,13,2,1,21,148,32,12,8,1...|\n",
"|{0,0,30584,0,0,22,0,0,0,0,0,0,29,46,64,78,77,1,14,15375,1,0,0,0,0,0,0,2,0,0,0...|\n",
"|{86,0,1,3,0,0,1,0,0,15,0,0,1,0,0,3,15,20,5,8,10,10,4,6,6,4,5,7,8,7,9,4,5,4,2,...|\n",
"|{45772,0,0,0,0,24,7,21,0,168,0,1,1,48,36,86,47,76,0,1,1,0,0,2,0,0,0,0,0,0,0,0...|\n",
"| {268,0,0,0,2}|\n",
"| {46292,0,0,0,2}|\n",
"|{0,0,0,1,0,3,77,6,2,2,19,2,1,1,2,1,0,0,1,0,0,1,0,1,3,1,2,1,0,0,0,2,1,1,2,1,1,...|\n",
"+--------------------------------------------------------------------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"aggregates = df.select(\"aggregate\")\n",
"aggregates.cache()\n",
"aggregates.count()\n",
"aggregates.show(truncate=80)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initial approach using a python udf"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------------------------------------------------------------------+\n",
"| aggregate|\n",
"+--------------------------------------------------------------------------------+\n",
"| [0, 2, 0, 2, 2]|\n",
"| [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|\n",
"| [2, 0, 0, 0, 2]|\n",
"| [2, 0, 0, 0, 2]|\n",
"|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|\n",
"|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n",
"|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|\n",
"|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n",
"|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|\n",
"| [268, 0, 0, 0, 2]|\n",
"| [268, 0, 0, 0, 2]|\n",
"|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 25, 3, 24, 56...|\n",
"|[0, 45771, 1, 0, 341, 0, 170, 2, 0, 1, 0, 0, 0, 1, 0, 2, 0, 0, 0, 2, 0, 0, 0,...|\n",
"|[0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1...|\n",
"|[0, 0, 30584, 0, 0, 22, 0, 0, 0, 0, 0, 0, 29, 46, 64, 78, 77, 1, 14, 15375, 1...|\n",
"|[86, 0, 1, 3, 0, 0, 1, 0, 0, 15, 0, 0, 1, 0, 0, 3, 15, 20, 5, 8, 10, 10, 4, 6...|\n",
"|[45772, 0, 0, 0, 0, 24, 7, 21, 0, 168, 0, 1, 1, 48, 36, 86, 47, 76, 0, 1, 1, ...|\n",
"| [268, 0, 0, 0, 2]|\n",
"| [46292, 0, 0, 0, 2]|\n",
"|[0, 0, 0, 1, 0, 3, 77, 6, 2, 2, 19, 2, 1, 1, 2, 1, 0, 0, 1, 0, 0, 1, 0, 1, 3,...|\n",
"+--------------------------------------------------------------------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"import ast\n",
"\n",
"aggregate_schema = T.ArrayType(T.IntegerType())\n",
"\n",
"@F.udf(aggregate_schema)\n",
"def from_pg_array_py(arr):\n",
" if not arr:\n",
" return None\n",
" # replace curly braces with brackets, then use a safe literal eval\n",
" return ast.literal_eval(\"[\" + arr[1:-1] + \"]\")\n",
"\n",
"aggregates.select(from_pg_array_py(\"aggregate\").alias(\"aggregate\")).show(truncate=80)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Alternative implementation using `from_json`"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------------------------------------------------------------------+\n",
"| aggregate|\n",
"+--------------------------------------------------------------------------------+\n",
"| [0, 2, 0, 2, 2]|\n",
"| [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|\n",
"| [2, 0, 0, 0, 2]|\n",
"| [2, 0, 0, 0, 2]|\n",
"|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|\n",
"|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n",
"|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|\n",
"|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n",
"|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|\n",
"| [268, 0, 0, 0, 2]|\n",
"| [268, 0, 0, 0, 2]|\n",
"|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 25, 3, 24, 56...|\n",
"|[0, 45771, 1, 0, 341, 0, 170, 2, 0, 1, 0, 0, 0, 1, 0, 2, 0, 0, 0, 2, 0, 0, 0,...|\n",
"|[0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1...|\n",
"|[0, 0, 30584, 0, 0, 22, 0, 0, 0, 0, 0, 0, 29, 46, 64, 78, 77, 1, 14, 15375, 1...|\n",
"|[86, 0, 1, 3, 0, 0, 1, 0, 0, 15, 0, 0, 1, 0, 0, 3, 15, 20, 5, 8, 10, 10, 4, 6...|\n",
"|[45772, 0, 0, 0, 0, 24, 7, 21, 0, 168, 0, 1, 1, 48, 36, 86, 47, 76, 0, 1, 1, ...|\n",
"| [268, 0, 0, 0, 2]|\n",
"| [46292, 0, 0, 0, 2]|\n",
"|[0, 0, 0, 1, 0, 3, 77, 6, 2, 2, 19, 2, 1, 1, 2, 1, 0, 0, 1, 0, 0, 1, 0, 1, 3,...|\n",
"+--------------------------------------------------------------------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"def from_pg_array(arr):\n",
" return F.from_json(F.translate(F.translate(arr, \"{\", \"[\"), \"}\", \"]\"), aggregate_schema).alias(arr)\n",
"\n",
"aggregates.select(from_pg_array(\"aggregate\")).show(truncate=80)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Test the speed of the two implementations\n",
"\n",
"The `from_json` method performs significantly better than the python udf, by a factor of ~8x."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12.3 ms, sys: 5.55 ms, total: 17.9 ms\n",
"Wall time: 4min 5s\n",
"CPU times: user 4.1 ms, sys: 1.05 ms, total: 5.15 ms\n",
"Wall time: 28.5 s\n"
]
}
],
"source": [
"%time aggregates.select(from_pg_array_py(\"aggregate\").alias(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-py\", mode=\"overwrite\")\n",
"%time aggregates.select(from_pg_array(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-sql\", mode=\"overwrite\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Test the relative size of these two approaches with compression"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.05 ms, sys: 881 µs, total: 1.93 ms\n",
"Wall time: 3.78 s\n",
"CPU times: user 4.47 ms, sys: 1.2 ms, total: 5.67 ms\n",
"Wall time: 28.3 s\n"
]
}
],
"source": [
"%time aggregates.repartition(1).write.parquet(\"/tmp/agg-string\", mode=\"overwrite\")\n",
"%time aggregates.select(from_pg_array(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-list\", mode=\"overwrite\")"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"145M\t/tmp/agg-string\n",
"129M\t/tmp/agg-list\n"
]
}
],
"source": [
"! du -h /tmp/agg-string\n",
"! du -h /tmp/agg-list"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Without compression\n",
"\n",
"Without compression, the binary format wins by a small margin. This matters, since BigQuery will store all the rows uncompressed."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.52 ms, sys: 1.22 ms, total: 2.74 ms\n",
"Wall time: 3.53 s\n",
"CPU times: user 5.03 ms, sys: 1.38 ms, total: 6.41 ms\n",
"Wall time: 23.1 s\n"
]
}
],
"source": [
"%time aggregates.repartition(1).write.parquet(\"/tmp/agg-string\", mode=\"overwrite\", compression=\"none\")\n",
"%time aggregates.select(from_pg_array(\"aggregate\")).write.parquet(\"/tmp/agg-list\", mode=\"overwrite\", compression=\"none\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"469M\t/tmp/agg-string\n",
"372M\t/tmp/agg-list\n"
]
}
],
"source": [
"! du -h /tmp/agg-string\n",
"! du -h /tmp/agg-list"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parsing table name\n",
"\n",
"We don't need to keep the table name around, since we have all the information to reconstruct it."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+-------+\n",
"|channel|version|\n",
"+-------+-------+\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"|nightly| 43|\n",
"+-------+-------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"(\n",
" df.select(F.split(\"table_name\", \"_\").alias(\"parts\"))\n",
" .select(F.col(\"parts\").getItem(2).alias(\"channel\"), F.col(\"parts\").getItem(3).alias(\"version\"))\n",
" .show()\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Putting it all together"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0-----------------------------------------------------\n",
" aggregate_type | submission \n",
" channel | nightly \n",
" version | 43 \n",
" ds_nodash | 20191201 \n",
" os | Windows_NT \n",
" child | false \n",
" label | \n",
" metric | A11Y_INSTANTIATED_FLAG \n",
" osVersion | 10.0 \n",
" application | Firefox \n",
" architecture | x86 \n",
" aggregate | [0, 2, 0, 2, 2] \n",
"-RECORD 1-----------------------------------------------------\n",
" aggregate_type | submission \n",
" channel | nightly \n",
" version | 43 \n",
" ds_nodash | 20191201 \n",
" os | Windows_NT \n",
" child | false \n",
" label | \n",
" metric | A11Y_CONSUMERS \n",
" osVersion | 10.0 \n",
" application | Firefox \n",
" architecture | x86 \n",
" aggregate | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2] \n",
"-RECORD 2-----------------------------------------------------\n",
" aggregate_type | submission \n",
" channel | nightly \n",
" version | 43 \n",
" ds_nodash | 20191201 \n",
" os | Windows_NT \n",
" child | false \n",
" label | \n",
" metric | A11Y_ISIMPLEDOM_USAGE_FLAG \n",
" osVersion | 10.0 \n",
" application | Firefox \n",
" architecture | x86 \n",
" aggregate | [2, 0, 0, 0, 2] \n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"input_dir = f\"../data/parquet/submission/20191201\"\n",
"df = spark.read.parquet(input_dir)\n",
"\n",
"result = (\n",
" df\n",
" .withColumn(\"parts\", F.split(\"table_name\", \"_\"))\n",
" .withColumn(\"dimension\", F.from_json(\"dimension\", dimension_type))\n",
" .select(\n",
" \"aggregate_type\",\n",
" F.col(\"parts\").getItem(2).alias(\"channel\"), \n",
" F.col(\"parts\").getItem(3).alias(\"version\"),\n",
" \"ds_nodash\",\n",
" \"dimension.*\",\n",
" from_pg_array(\"aggregate\"),\n",
" )\n",
")\n",
"\n",
"result.show(vertical=True, n=3, truncate=80)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}