jobs/mozaggregator2bq/notebooks/02-pre_bq_transforms.ipynb (559 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Transforming the aggregates for convenience\n", "\n", "We'd like to transform the aggregates into a form that will be useful in BigQuery. We also want to be able to invert the transformation, so we should make sure to save all the relevant information. Adding structure will make it easier to process things.\n", "\n", "## Processing dimensions\n", "\n", "This notebook relies on the output of `01-convert_to_parquet`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0------------------------------------------------------------------------------------------\n", " table_name | submission_date_nightly_43_20191201 \n", " aggregate_type | submission \n", " ds_nodash | 20191201 \n", " dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_INSTANTIA... \n", " aggregate | {0,2,0,2,2} \n", "-RECORD 1------------------------------------------------------------------------------------------\n", " table_name | submission_date_nightly_43_20191201 \n", " aggregate_type | submission \n", " ds_nodash | 20191201 \n", " dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_CONSUMERS... \n", " aggregate | {0,0,0,0,0,0,0,0,0,0,0,2,22,2} \n", "-RECORD 2------------------------------------------------------------------------------------------\n", " table_name | submission_date_nightly_43_20191201 \n", " aggregate_type | submission \n", " ds_nodash | 20191201 \n", " dimension | {\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_ISIMPLEDO... \n", " aggregate | {2,0,0,0,2} \n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "input_dir = f\"../data/parquet/submission/20191201\"\n", "df = spark.read.parquet(input_dir)\n", "df.show(vertical=True, n=3, truncate=80)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"os\": \"Windows_NT\", \"child\": \"false\", \"label\": \"\", \"metric\": \"A11Y_INSTANTIATED_FLAG\", \"osVersion\": \"10.0\", \"application\": \"Firefox\", \"architecture\": \"x86\"}\n" ] } ], "source": [ "print(df.first().dimension)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0----------------------------\n", " os | Windows_NT \n", " child | false \n", " label | \n", " metric | A11Y_INSTANTIATED... \n", " osVersion | 10.0 \n", " application | Firefox \n", " architecture | x86 \n", "only showing top 1 row\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as F, types as T\n", "\n", "dimension_type = T.StructType([\n", " T.StructField(\"os\", T.StringType()),\n", " T.StructField(\"child\", T.StringType()),\n", " T.StructField(\"label\", T.StringType()),\n", " T.StructField(\"metric\", T.StringType()),\n", " T.StructField(\"osVersion\", T.StringType()),\n", " T.StructField(\"application\", T.StringType()),\n", " T.StructField(\"architecture\", T.StringType()),\n", "])\n", "\n", "df.select(F.from_json(\"dimension\", dimension_type).alias(\"dimension\")).select(\"dimension.*\").show(vertical=True, n=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Processing histograms\n", "\n", "It's useful to note that the last two elements of the aggregate array correspond to `sum` and `count`." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------------------------------------------------------------+\n", "| aggregate|\n", "+--------------------------------------------------------------------------------+\n", "| {0,2,0,2,2}|\n", "| {0,0,0,0,0,0,0,0,0,0,0,2,22,2}|\n", "| {2,0,0,0,2}|\n", "| {2,0,0,0,2}|\n", "|{871471,140673,200275,430614,269254,168218,390196,232243,76503,27335,15955,12...|\n", "|{0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|\n", "|{43944,2046,143,50,15,9,9,15,38,14,6,2,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...|\n", "|{0,0,0,0,1,2,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|\n", "|{0,0,3,2,2,52,44,12,6,3,1,3,2,2,13,61,39,17,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0...|\n", "| {268,0,0,0,2}|\n", "| {268,0,0,0,2}|\n", "|{0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,25,3,24,56,25,31,37,28,13,14,5,0,0...|\n", "|{0,45771,1,0,341,0,170,2,0,1,0,0,0,1,0,2,0,0,0,2,0,0,0,0,0,0,2,0,0,0,0,0,0,0,...|\n", "|{0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,13,2,1,21,148,32,12,8,1...|\n", "|{0,0,30584,0,0,22,0,0,0,0,0,0,29,46,64,78,77,1,14,15375,1,0,0,0,0,0,0,2,0,0,0...|\n", "|{86,0,1,3,0,0,1,0,0,15,0,0,1,0,0,3,15,20,5,8,10,10,4,6,6,4,5,7,8,7,9,4,5,4,2,...|\n", "|{45772,0,0,0,0,24,7,21,0,168,0,1,1,48,36,86,47,76,0,1,1,0,0,2,0,0,0,0,0,0,0,0...|\n", "| {268,0,0,0,2}|\n", "| {46292,0,0,0,2}|\n", "|{0,0,0,1,0,3,77,6,2,2,19,2,1,1,2,1,0,0,1,0,0,1,0,1,3,1,2,1,0,0,0,2,1,1,2,1,1,...|\n", "+--------------------------------------------------------------------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "aggregates = df.select(\"aggregate\")\n", "aggregates.cache()\n", "aggregates.count()\n", "aggregates.show(truncate=80)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initial approach using a python udf" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------------------------------------------------------------+\n", "| aggregate|\n", "+--------------------------------------------------------------------------------+\n", "| [0, 2, 0, 2, 2]|\n", "| [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|\n", "| [2, 0, 0, 0, 2]|\n", "| [2, 0, 0, 0, 2]|\n", "|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|\n", "|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n", "|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|\n", "|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n", "|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|\n", "| [268, 0, 0, 0, 2]|\n", "| [268, 0, 0, 0, 2]|\n", "|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 25, 3, 24, 56...|\n", "|[0, 45771, 1, 0, 341, 0, 170, 2, 0, 1, 0, 0, 0, 1, 0, 2, 0, 0, 0, 2, 0, 0, 0,...|\n", "|[0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1...|\n", "|[0, 0, 30584, 0, 0, 22, 0, 0, 0, 0, 0, 0, 29, 46, 64, 78, 77, 1, 14, 15375, 1...|\n", "|[86, 0, 1, 3, 0, 0, 1, 0, 0, 15, 0, 0, 1, 0, 0, 3, 15, 20, 5, 8, 10, 10, 4, 6...|\n", "|[45772, 0, 0, 0, 0, 24, 7, 21, 0, 168, 0, 1, 1, 48, 36, 86, 47, 76, 0, 1, 1, ...|\n", "| [268, 0, 0, 0, 2]|\n", "| [46292, 0, 0, 0, 2]|\n", "|[0, 0, 0, 1, 0, 3, 77, 6, 2, 2, 19, 2, 1, 1, 2, 1, 0, 0, 1, 0, 0, 1, 0, 1, 3,...|\n", "+--------------------------------------------------------------------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "import ast\n", "\n", "aggregate_schema = T.ArrayType(T.IntegerType())\n", "\n", "@F.udf(aggregate_schema)\n", "def from_pg_array_py(arr):\n", " if not arr:\n", " return None\n", " # replace curly braces with brackets, then use a safe literal eval\n", " return ast.literal_eval(\"[\" + arr[1:-1] + \"]\")\n", "\n", "aggregates.select(from_pg_array_py(\"aggregate\").alias(\"aggregate\")).show(truncate=80)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Alternative implementation using `from_json`" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------------------------------------------------------------+\n", "| aggregate|\n", "+--------------------------------------------------------------------------------+\n", "| [0, 2, 0, 2, 2]|\n", "| [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|\n", "| [2, 0, 0, 0, 2]|\n", "| [2, 0, 0, 0, 2]|\n", "|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|\n", "|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n", "|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|\n", "|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|\n", "|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|\n", "| [268, 0, 0, 0, 2]|\n", "| [268, 0, 0, 0, 2]|\n", "|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 25, 3, 24, 56...|\n", "|[0, 45771, 1, 0, 341, 0, 170, 2, 0, 1, 0, 0, 0, 1, 0, 2, 0, 0, 0, 2, 0, 0, 0,...|\n", "|[0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1...|\n", "|[0, 0, 30584, 0, 0, 22, 0, 0, 0, 0, 0, 0, 29, 46, 64, 78, 77, 1, 14, 15375, 1...|\n", "|[86, 0, 1, 3, 0, 0, 1, 0, 0, 15, 0, 0, 1, 0, 0, 3, 15, 20, 5, 8, 10, 10, 4, 6...|\n", "|[45772, 0, 0, 0, 0, 24, 7, 21, 0, 168, 0, 1, 1, 48, 36, 86, 47, 76, 0, 1, 1, ...|\n", "| [268, 0, 0, 0, 2]|\n", "| [46292, 0, 0, 0, 2]|\n", "|[0, 0, 0, 1, 0, 3, 77, 6, 2, 2, 19, 2, 1, 1, 2, 1, 0, 0, 1, 0, 0, 1, 0, 1, 3,...|\n", "+--------------------------------------------------------------------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "def from_pg_array(arr):\n", " return F.from_json(F.translate(F.translate(arr, \"{\", \"[\"), \"}\", \"]\"), aggregate_schema).alias(arr)\n", "\n", "aggregates.select(from_pg_array(\"aggregate\")).show(truncate=80)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test the speed of the two implementations\n", "\n", "The `from_json` method performs significantly better than the python udf, by a factor of ~8x." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 12.3 ms, sys: 5.55 ms, total: 17.9 ms\n", "Wall time: 4min 5s\n", "CPU times: user 4.1 ms, sys: 1.05 ms, total: 5.15 ms\n", "Wall time: 28.5 s\n" ] } ], "source": [ "%time aggregates.select(from_pg_array_py(\"aggregate\").alias(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-py\", mode=\"overwrite\")\n", "%time aggregates.select(from_pg_array(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-sql\", mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test the relative size of these two approaches with compression" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.05 ms, sys: 881 µs, total: 1.93 ms\n", "Wall time: 3.78 s\n", "CPU times: user 4.47 ms, sys: 1.2 ms, total: 5.67 ms\n", "Wall time: 28.3 s\n" ] } ], "source": [ "%time aggregates.repartition(1).write.parquet(\"/tmp/agg-string\", mode=\"overwrite\")\n", "%time aggregates.select(from_pg_array(\"aggregate\")).repartition(1).write.parquet(\"/tmp/agg-list\", mode=\"overwrite\")" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "145M\t/tmp/agg-string\n", "129M\t/tmp/agg-list\n" ] } ], "source": [ "! du -h /tmp/agg-string\n", "! du -h /tmp/agg-list" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Without compression\n", "\n", "Without compression, the binary format wins by a small margin. This matters, since BigQuery will store all the rows uncompressed." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.52 ms, sys: 1.22 ms, total: 2.74 ms\n", "Wall time: 3.53 s\n", "CPU times: user 5.03 ms, sys: 1.38 ms, total: 6.41 ms\n", "Wall time: 23.1 s\n" ] } ], "source": [ "%time aggregates.repartition(1).write.parquet(\"/tmp/agg-string\", mode=\"overwrite\", compression=\"none\")\n", "%time aggregates.select(from_pg_array(\"aggregate\")).write.parquet(\"/tmp/agg-list\", mode=\"overwrite\", compression=\"none\")" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "469M\t/tmp/agg-string\n", "372M\t/tmp/agg-list\n" ] } ], "source": [ "! du -h /tmp/agg-string\n", "! du -h /tmp/agg-list" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parsing table name\n", "\n", "We don't need to keep the table name around, since we have all the information to reconstruct it." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-------+\n", "|channel|version|\n", "+-------+-------+\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "|nightly| 43|\n", "+-------+-------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "(\n", " df.select(F.split(\"table_name\", \"_\").alias(\"parts\"))\n", " .select(F.col(\"parts\").getItem(2).alias(\"channel\"), F.col(\"parts\").getItem(3).alias(\"version\"))\n", " .show()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Putting it all together" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0-----------------------------------------------------\n", " aggregate_type | submission \n", " channel | nightly \n", " version | 43 \n", " ds_nodash | 20191201 \n", " os | Windows_NT \n", " child | false \n", " label | \n", " metric | A11Y_INSTANTIATED_FLAG \n", " osVersion | 10.0 \n", " application | Firefox \n", " architecture | x86 \n", " aggregate | [0, 2, 0, 2, 2] \n", "-RECORD 1-----------------------------------------------------\n", " aggregate_type | submission \n", " channel | nightly \n", " version | 43 \n", " ds_nodash | 20191201 \n", " os | Windows_NT \n", " child | false \n", " label | \n", " metric | A11Y_CONSUMERS \n", " osVersion | 10.0 \n", " application | Firefox \n", " architecture | x86 \n", " aggregate | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2] \n", "-RECORD 2-----------------------------------------------------\n", " aggregate_type | submission \n", " channel | nightly \n", " version | 43 \n", " ds_nodash | 20191201 \n", " os | Windows_NT \n", " child | false \n", " label | \n", " metric | A11Y_ISIMPLEDOM_USAGE_FLAG \n", " osVersion | 10.0 \n", " application | Firefox \n", " architecture | x86 \n", " aggregate | [2, 0, 0, 0, 2] \n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "input_dir = f\"../data/parquet/submission/20191201\"\n", "df = spark.read.parquet(input_dir)\n", "\n", "result = (\n", " df\n", " .withColumn(\"parts\", F.split(\"table_name\", \"_\"))\n", " .withColumn(\"dimension\", F.from_json(\"dimension\", dimension_type))\n", " .select(\n", " \"aggregate_type\",\n", " F.col(\"parts\").getItem(2).alias(\"channel\"), \n", " F.col(\"parts\").getItem(3).alias(\"version\"),\n", " \"ds_nodash\",\n", " \"dimension.*\",\n", " from_pg_array(\"aggregate\"),\n", " )\n", ")\n", "\n", "result.show(vertical=True, n=3, truncate=80)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }