jobs/mozaggregator2bq/notebooks/01-convert_to_parquet.ipynb (566 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Convert a day of data into parquet\n",
"\n",
"We're interested in parsing `pg_dump` data from the mozaggregator database in the following form:\n",
"\n",
"```\n",
"../data\n",
"├── build_id\n",
"│ └── 20191201\n",
"│ ├── 474306.dat.gz\n",
"│ └── toc.dat\n",
"└── submission\n",
" └── 20191201\n",
" ├── 474405.dat.gz\n",
" ├── 474406.dat.gz\n",
" ...\n",
" ├── 474504.dat.gz\n",
" └── toc.dat\n",
"\n",
"4 directories, 103 files\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"data_dir=\"../data\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add flattened metadata to the aggregates"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+---------------+---------+--------+\n",
"| dimension| aggregate| aggregate_type|ds_nodash|table_id|\n",
"+--------------------+--------------------+---------------+---------+--------+\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,1,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,2,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,4,1,2,0,0,1,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,11,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,2,1,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{11,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,2,1,1,1,2,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{9,2,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{11,0,0,0,0,0,0,0...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,1,0,0,0,0,0,1,...|submission_date| 20191201| 474922|\n",
"|{\"os\": \"Darwin\", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201| 474922|\n",
"+--------------------+--------------------+---------------+---------+--------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"3123817"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import functions as F, types as T\n",
"\n",
"metadata_struct = T.StructType([\n",
" T.StructField(\"aggregate_type\", T.StringType(), False),\n",
" T.StructField(\"ds_nodash\", T.StringType(), False),\n",
" T.StructField(\"table_id\", T.IntegerType(), False),\n",
"])\n",
"@F.udf(metadata_struct)\n",
"def parse_filename(path):\n",
" aggregate_type, ds_nodash, filename = path.split(\"/\")[-3:]\n",
" return aggregate_type, ds_nodash, int(filename.split(\".\")[0])\n",
"\n",
"def read_pg_dump(input_dir):\n",
" return (\n",
" spark.read.csv(f\"{input_dir}/*.dat.gz\", sep=\"\\t\", schema=\"dimension string, aggregate string\")\n",
" .withColumn(\"file_name\", parse_filename(F.input_file_name()))\n",
" .select(\"dimension\", \"aggregate\", \"file_name.*\")\n",
" )\n",
"\n",
"df = read_pg_dump(f\"{data_dir}/submission_date/20191201\")\n",
"df.show()\n",
"df.count()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n",
" dimension | {\"os\": \"Darwin\", \"child\": \"true\", \"label\": \"\", \"metric\": \"SCALARS_MEDIA.AUTOPLAY_WOULD_NOT_BE_ALLOWED_COUNT\", \"osVersion\": \"16.5.0\", \"application\": \"Firefox\", \"architecture\": \"x86-64\"} \n",
" aggregate | {0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,16,2} \n",
" aggregate_type | submission_date \n",
" ds_nodash | 20191201 \n",
" table_id | 474922 \n",
"\n"
]
}
],
"source": [
"df.limit(1).show(vertical=True, truncate=False)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"100"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select(\"table_id\").distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse the table of contents"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"toc_file = f\"{data_dir}/submission_date/20191201/toc.dat\"\n",
"\n",
"with open(toc_file, \"rb\") as f:\n",
" data = f.read()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[b'\\x00\\x00\\x00474879.dat\\x00\\xd0>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014330\\x00\"\\x00\\x00\\x00submission_date_aurora_40_20191201\\x00',\n",
" b'\\x00\\x00\\x00474832.dat\\x00\\xe0>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014699\\x00\"\\x00\\x00\\x00submission_date_aurora_41_20191201\\x00',\n",
" b'\\x00\\x00\\x00474848.dat\\x00\\xcd>\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0090014321\\x00\"\\x00\\x00\\x00submission_date_aurora_42_20191201\\x00']"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dat = [x for x in data.split(b\"\\n\") if b\".dat\" in x]\n",
"dat[:3]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[{'table_id': '474879', 'table_name': 'submission_date_aurora_40_20191201'},\n",
" {'table_id': '474832', 'table_name': 'submission_date_aurora_41_20191201'},\n",
" {'table_id': '474848', 'table_name': 'submission_date_aurora_42_20191201'},\n",
" {'table_id': '474829', 'table_name': 'submission_date_aurora_43_20191201'},\n",
" {'table_id': '474845', 'table_name': 'submission_date_aurora_44_20191201'},\n",
" {'table_id': '474889', 'table_name': 'submission_date_aurora_45_20191201'},\n",
" {'table_id': '474864', 'table_name': 'submission_date_aurora_46_20191201'},\n",
" {'table_id': '474875', 'table_name': 'submission_date_aurora_47_20191201'},\n",
" {'table_id': '474881', 'table_name': 'submission_date_aurora_48_20191201'},\n",
" {'table_id': '474858', 'table_name': 'submission_date_aurora_49_20191201'}]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def extract_mapping(line):\n",
" \"\"\"Parse the binary toc files for the table and the table name.\"\"\"\n",
" \n",
" # We rely on the padding in the binary file to extract the necessary information\n",
" # b'\\x00\\x00\\x00474424.dat\\x00%=\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\n",
" # \\x00\\x08\\x00\\x00\\x0090014321\\x00\"\\x00\\x00\\x00submission_date_aurora_42_20191201\\x00'\n",
" processed = line.replace(b\"\\x00\", b\" \").strip().split()\n",
" \n",
" # [b'474455.dat', b'(=\\x07', b'\\x01', b'\\x01', b'0', b'\\x08', b'90014330', b'\"', \n",
" # b'submission_date_aurora_40_20191201']\n",
" table_name = processed[-1].decode()\n",
" if b\"CREATE INDEX\" in line:\n",
" # this is an indexed table, get the actual name\n",
" for element in processed:\n",
" if b\"public.\" not in element:\n",
" continue\n",
" table_name = element.split(b\"public.\")[-1].decode()\n",
" \n",
" return {\"table_id\": processed[0].split(b\".\")[0].decode(), \"table_name\": table_name}\n",
"\n",
"def parse_toc(data):\n",
" return [extract_mapping(line) for line in data.split(b\"\\n\") if b\".dat\" in line]\n",
"\n",
"parse_toc(data)[:10]"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+----------------------------------+\n",
"|table_id|table_name |\n",
"+--------+----------------------------------+\n",
"|474879 |submission_date_aurora_40_20191201|\n",
"|474832 |submission_date_aurora_41_20191201|\n",
"|474848 |submission_date_aurora_42_20191201|\n",
"|474829 |submission_date_aurora_43_20191201|\n",
"|474845 |submission_date_aurora_44_20191201|\n",
"+--------+----------------------------------+\n",
"only showing top 5 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/amiyaguchi/Work/mozaggregator2bq/venv/lib/python3.7/site-packages/pyspark/sql/session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead\n",
" warnings.warn(\"inferring schema from dict is deprecated,\"\n"
]
}
],
"source": [
"spark.createDataFrame(parse_toc(data)).show(truncate=False, n=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### TOC for `build_id`s"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"toc_file = f\"{data_dir}/build_id/20191220/toc.dat\"\n",
"\n",
"with open(toc_file, \"rb\") as f:\n",
" data = f.read()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[b\"\\x00\\x00\\x00496168.dat\\x00'\\x92\\x07\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x000\\x00\\x08\\x00\\x00\\x0091418126\\x00\\x1c\\x00\\x00\\x00build_id_nightly_73_20191220\\x00\",\n",
" b'\\x00\\x00\\x00496167.dat\\x00\\xb9\\x91\\x07\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04\\x00\\x00\\x001259\\x00\\x08\\x00\\x00\\x0091420008\\x00+\\x00\\x00\\x00build_id_nightly_68_20191220_dimensions_idx\\x00\\x05\\x00\\x00\\x00INDEX\\x00\\x04\\x00\\x00\\x00\\x00\\x87\\x00\\x00\\x00CREATE INDEX build_id_nightly_68_20191220_dimensions_idx ON public.build_id_nightly_68_20191220 USING gin (dimensions jsonb_path_ops);']"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(filter(lambda x: b\".dat\" in x, data.split(b\"\\n\")))"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[b'496167.dat',\n",
" b'\\xb9\\x91\\x07',\n",
" b'\\x04',\n",
" b'1259',\n",
" b'\\x08',\n",
" b'91420008',\n",
" b'+',\n",
" b'build_id_nightly_68_20191220_dimensions_idx',\n",
" b'\\x05',\n",
" b'INDEX',\n",
" b'\\x04',\n",
" b'\\x87',\n",
" b'CREATE',\n",
" b'INDEX',\n",
" b'build_id_nightly_68_20191220_dimensions_idx',\n",
" b'ON',\n",
" b'public.build_id_nightly_68_20191220',\n",
" b'USING',\n",
" b'gin',\n",
" b'(dimensions',\n",
" b'jsonb_path_ops);']"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dat = [x for x in data.split(b\"\\n\") if b\".dat\" in x]\n",
"processed = dat[1].replace(b\"\\x00\", b\" \").strip().split()\n",
"processed"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[{'table_id': '496168', 'table_name': 'build_id_nightly_73_20191220'},\n",
" {'table_id': '496167', 'table_name': 'build_id_nightly_68_20191220'}]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parse_toc(data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Putting everything together"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.96 ms, sys: 1.35 ms, total: 5.31 ms\n",
"Wall time: 16.6 s\n"
]
}
],
"source": [
"from pyspark.sql import Row\n",
"\n",
"def main(input_dir, output_dir):\n",
" # parse the table of contents\n",
" toc_file = f\"{input_dir}/toc.dat\"\n",
" with open(toc_file, \"rb\") as f:\n",
" data = f.read()\n",
" toc_df = spark.createDataFrame([Row(**d) for d in parse_toc(data)])\n",
"\n",
" df = read_pg_dump(input_dir)\n",
" # join, reorder, and write to a single parquet partition\n",
" joined = df.join(toc_df, on=\"table_id\")\n",
" columns = [\"table_name\", \"aggregate_type\", \"ds_nodash\", \"dimension\", \"aggregate\"]\n",
"\n",
" out_df = joined.select(*columns)\n",
" # NOTE: strip out this jupyter magic if copied into a script\n",
" %time out_df.repartition(1).write.parquet(output_dir, mode=\"overwrite\")\n",
"\n",
"\n",
"input_directory = f\"{data_dir}/submission_date/20191201\"\n",
"output_directory = f\"{data_dir}/parquet/submission_date/20191201\"\n",
"main(input_directory, output_directory)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"118M\t../data/submission_date/20191201/\n",
"210M\t../data/parquet/submission_date/20191201/\n"
]
}
],
"source": [
"! du -h ../data/submission_date/20191201/\n",
"! du -h ../data/parquet/submission_date/20191201/"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### And again, for build id aggregates"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.43 ms, sys: 480 µs, total: 1.91 ms\n",
"Wall time: 2.28 s\n"
]
}
],
"source": [
"input_directory = f\"{data_dir}/build_id/20191201\"\n",
"output_directory = f\"{data_dir}/parquet/build_id/20191201\"\n",
"main(input_directory, output_directory)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"8.5M\t../data/build_id/20191201/\n",
" 14M\t../data/parquet/build_id/20191201/\n"
]
}
],
"source": [
"! du -h ../data/build_id/20191201/\n",
"! du -h ../data/parquet/build_id/20191201/"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}