jobs/mozaggregator2bq/notebooks/01-convert_to_parquet.ipynb (566 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Convert a day of data into parquet\n", "\n", "We're interested in parsing `pg_dump` data from the mozaggregator database in the following form:\n", "\n", "```\n", "../data\n", "├── build_id\n", "│ └── 20191201\n", "│ ├── 474306.dat.gz\n", "│ └── toc.dat\n", "└── submission\n", " └── 20191201\n", " ├── 474405.dat.gz\n", " ├── 474406.dat.gz\n", " ...\n", " ├── 474504.dat.gz\n", " └── toc.dat\n", "\n", "4 directories, 103 files\n", "```" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "data_dir=\"../data\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Add flattened metadata to the aggregates" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+--------------------+---------------+---------+--------+\n", "| dimension| aggregate| aggregate_type|ds_nodash|table_id|\n", "+--------------------+--------------------+---------------+---------+--------+\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,1,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,2,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,4,1,2,0,0,1,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,11,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,2,1,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{11,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,2,1,1,1,2,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{9,2,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{11,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,1,0,0,0,0,0,1,...|submission_date| 20191201| 474922|\n", "|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n", "+--------------------+--------------------+---------------+---------+--------+\n", "only showing top 20 rows\n", "\n" ] }, { "data": { "text/plain": [ "3123817" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import functions as F, types as T\n", "\n", "metadata_struct = T.StructType([\n", " T.StructField(\"aggregate_type\", T.StringType(), False),\n", " T.StructField(\"ds_nodash\", T.StringType(), False),\n", " T.StructField(\"table_id\", T.IntegerType(), False),\n", "])\n", "@F.udf(metadata_struct)\n", "def parse_filename(path):\n", " aggregate_type, ds_nodash, filename = path.split(\"/\")[-3:]\n", " return aggregate_type, ds_nodash, int(filename.split(\".\")[0])\n", "\n", "def read_pg_dump(input_dir):\n", " return (\n", " spark.read.csv(f\"{input_dir}/*.dat.gz\", sep=\"\\t\", schema=\"dimension string, aggregate string\")\n", " .withColumn(\"file_name\", parse_filename(F.input_file_name()))\n", " .select(\"dimension\", \"aggregate\", \"file_name.*\")\n", " )\n", "\n", "df = read_pg_dump(f\"{data_dir}/submission_date/20191201\")\n", "df.show()\n", "df.count()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n", " dimension | {\"os\": \"Darwin\", \"child\": \"true\", \"label\": \"\", \"metric\": \"SCALARS_MEDIA.AUTOPLAY_WOULD_NOT_BE_ALLOWED_COUNT\", \"osVersion\": \"16.5.0\", \"application\": \"Firefox\", \"architecture\": \"x86-64\"} \n", " aggregate | {0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,16,2} \n", " aggregate_type | submission_date \n", " ds_nodash | 20191201 \n", " table_id | 474922 \n", "\n" ] } ], "source": [ "df.limit(1).show(vertical=True, truncate=False)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "100" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.select(\"table_id\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parse the table of contents" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "toc_file = f\"{data_dir}/submission_date/20191201/toc.dat\"\n", "\n", "with open(toc_file, \"rb\") as f:\n", " data = f.read()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[b'\\x00\\x00\\x00474879.dat\\x00\\xd0>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014330\\x00\"\\x00\\x00\\x00submission_date_aurora_40_20191201\\x00',\n", " b'\\x00\\x00\\x00474832.dat\\x00\\xe0>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014699\\x00\"\\x00\\x00\\x00submission_date_aurora_41_20191201\\x00',\n", " b'\\x00\\x00\\x00474848.dat\\x00\\xcd>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014321\\x00\"\\x00\\x00\\x00submission_date_aurora_42_20191201\\x00']" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dat = [x for x in data.split(b\"\\n\") if b\".dat\" in x]\n", "dat[:3]" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[{'table_id': '474879', 'table_name': 'submission_date_aurora_40_20191201'},\n", " {'table_id': '474832', 'table_name': 'submission_date_aurora_41_20191201'},\n", " {'table_id': '474848', 'table_name': 'submission_date_aurora_42_20191201'},\n", " {'table_id': '474829', 'table_name': 'submission_date_aurora_43_20191201'},\n", " {'table_id': '474845', 'table_name': 'submission_date_aurora_44_20191201'},\n", " {'table_id': '474889', 'table_name': 'submission_date_aurora_45_20191201'},\n", " {'table_id': '474864', 'table_name': 'submission_date_aurora_46_20191201'},\n", " {'table_id': '474875', 'table_name': 'submission_date_aurora_47_20191201'},\n", " {'table_id': '474881', 'table_name': 'submission_date_aurora_48_20191201'},\n", " {'table_id': '474858', 'table_name': 'submission_date_aurora_49_20191201'}]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def extract_mapping(line):\n", " \"\"\"Parse the binary toc files for the table and the table name.\"\"\"\n", " \n", " # We rely on the padding in the binary file to extract the necessary information\n", " # b'\\x00\\x00\\x00474424.dat\\x00%=\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\n", " # \\x00\\x08\\x00\\x00\\x0090014321\\x00\"\\x00\\x00\\x00submission_date_aurora_42_20191201\\x00'\n", " processed = line.replace(b\"\\x00\", b\" \").strip().split()\n", " \n", " # [b'474455.dat', b'(=\\x07', b'\\x01', b'\\x01', b'0', b'\\x08', b'90014330', b'\"', \n", " # b'submission_date_aurora_40_20191201']\n", " table_name = processed[-1].decode()\n", " if b\"CREATE INDEX\" in line:\n", " # this is an indexed table, get the actual name\n", " for element in processed:\n", " if b\"public.\" not in element:\n", " continue\n", " table_name = element.split(b\"public.\")[-1].decode()\n", " \n", " return {\"table_id\": processed[0].split(b\".\")[0].decode(), \"table_name\": table_name}\n", "\n", "def parse_toc(data):\n", " return [extract_mapping(line) for line in data.split(b\"\\n\") if b\".dat\" in line]\n", "\n", "parse_toc(data)[:10]" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+----------------------------------+\n", "|table_id|table_name |\n", "+--------+----------------------------------+\n", "|474879 |submission_date_aurora_40_20191201|\n", "|474832 |submission_date_aurora_41_20191201|\n", "|474848 |submission_date_aurora_42_20191201|\n", "|474829 |submission_date_aurora_43_20191201|\n", "|474845 |submission_date_aurora_44_20191201|\n", "+--------+----------------------------------+\n", "only showing top 5 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/amiyaguchi/Work/mozaggregator2bq/venv/lib/python3.7/site-packages/pyspark/sql/session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead\n", " warnings.warn(\"inferring schema from dict is deprecated,\"\n" ] } ], "source": [ "spark.createDataFrame(parse_toc(data)).show(truncate=False, n=5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### TOC for `build_id`s" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "toc_file = f\"{data_dir}/build_id/20191220/toc.dat\"\n", "\n", "with open(toc_file, \"rb\") as f:\n", " data = f.read()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[b\"\\x00\\x00\\x00496168.dat\\x00'\\x92\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0091418126\\x00\\x1c\\x00\\x00\\x00build_id_nightly_73_20191220\\x00\",\n", " b'\\x00\\x00\\x00496167.dat\\x00\\xb9\\x91\\x07\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04\\x00\\x00\\x001259\\x00\\x08\\x00\\x00\\x0091420008\\x00+\\x00\\x00\\x00build_id_nightly_68_20191220_dimensions_idx\\x00\\x05\\x00\\x00\\x00INDEX\\x00\\x04\\x00\\x00\\x00\\x00\\x87\\x00\\x00\\x00CREATE INDEX build_id_nightly_68_20191220_dimensions_idx ON public.build_id_nightly_68_20191220 USING gin (dimensions jsonb_path_ops);']" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(filter(lambda x: b\".dat\" in x, data.split(b\"\\n\")))" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[b'496167.dat',\n", " b'\\xb9\\x91\\x07',\n", " b'\\x04',\n", " b'1259',\n", " b'\\x08',\n", " b'91420008',\n", " b'+',\n", " b'build_id_nightly_68_20191220_dimensions_idx',\n", " b'\\x05',\n", " b'INDEX',\n", " b'\\x04',\n", " b'\\x87',\n", " b'CREATE',\n", " b'INDEX',\n", " b'build_id_nightly_68_20191220_dimensions_idx',\n", " b'ON',\n", " b'public.build_id_nightly_68_20191220',\n", " b'USING',\n", " b'gin',\n", " b'(dimensions',\n", " b'jsonb_path_ops);']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dat = [x for x in data.split(b\"\\n\") if b\".dat\" in x]\n", "processed = dat[1].replace(b\"\\x00\", b\" \").strip().split()\n", "processed" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[{'table_id': '496168', 'table_name': 'build_id_nightly_73_20191220'},\n", " {'table_id': '496167', 'table_name': 'build_id_nightly_68_20191220'}]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "parse_toc(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Putting everything together" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 3.96 ms, sys: 1.35 ms, total: 5.31 ms\n", "Wall time: 16.6 s\n" ] } ], "source": [ "from pyspark.sql import Row\n", "\n", "def main(input_dir, output_dir):\n", " # parse the table of contents\n", " toc_file = f\"{input_dir}/toc.dat\"\n", " with open(toc_file, \"rb\") as f:\n", " data = f.read()\n", " toc_df = spark.createDataFrame([Row(**d) for d in parse_toc(data)])\n", "\n", " df = read_pg_dump(input_dir)\n", " # join, reorder, and write to a single parquet partition\n", " joined = df.join(toc_df, on=\"table_id\")\n", " columns = [\"table_name\", \"aggregate_type\", \"ds_nodash\", \"dimension\", \"aggregate\"]\n", "\n", " out_df = joined.select(*columns)\n", " # NOTE: strip out this jupyter magic if copied into a script\n", " %time out_df.repartition(1).write.parquet(output_dir, mode=\"overwrite\")\n", "\n", "\n", "input_directory = f\"{data_dir}/submission_date/20191201\"\n", "output_directory = f\"{data_dir}/parquet/submission_date/20191201\"\n", "main(input_directory, output_directory)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "118M\t../data/submission_date/20191201/\n", "210M\t../data/parquet/submission_date/20191201/\n" ] } ], "source": [ "! du -h ../data/submission_date/20191201/\n", "! du -h ../data/parquet/submission_date/20191201/" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### And again, for build id aggregates" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.43 ms, sys: 480 µs, total: 1.91 ms\n", "Wall time: 2.28 s\n" ] } ], "source": [ "input_directory = f\"{data_dir}/build_id/20191201\"\n", "output_directory = f\"{data_dir}/parquet/build_id/20191201\"\n", "main(input_directory, output_directory)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "8.5M\t../data/build_id/20191201/\n", " 14M\t../data/parquet/build_id/20191201/\n" ] } ], "source": [ "! du -h ../data/build_id/20191201/\n", "! du -h ../data/parquet/build_id/20191201/" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }