# Transforming the aggregates for convenience

We'd like to transform the aggregates into a form that will be useful in BigQuery. We also want to be able to invert the transformation, so we should make sure to save all the relevant information. Adding structure will make it easier to process things.

## Processing dimensions

This notebook relies on the output of `01-convert_to_parquet`.

In [1]:
input_dir = f"../data/parquet/submission/20191201"
df = spark.read.parquet(input_dir)
df.show(vertical=True, n=3, truncate=80)

-RECORD 0------------------------------------------------------------------------------------------
 table_name     | submission_date_nightly_43_20191201                                              
 aggregate_type | submission                                                                       
 ds_nodash      | 20191201                                                                         
 dimension      | {"os": "Windows_NT", "child": "false", "label": "", "metric": "A11Y_INSTANTIA... 
 aggregate      | {0,2,0,2,2}                                                                      
-RECORD 1------------------------------------------------------------------------------------------
 table_name     | submission_date_nightly_43_20191201                                              
 aggregate_type | submission                                                                       
 ds_nodash      | 20191201                                                                         


In [2]:
print(df.first().dimension)

{"os": "Windows_NT", "child": "false", "label": "", "metric": "A11Y_INSTANTIATED_FLAG", "osVersion": "10.0", "application": "Firefox", "architecture": "x86"}


In [3]:
from pyspark.sql import functions as F, types as T

dimension_type = T.StructType([
    T.StructField("os", T.StringType()),
    T.StructField("child", T.StringType()),
    T.StructField("label", T.StringType()),
    T.StructField("metric", T.StringType()),
    T.StructField("osVersion", T.StringType()),
    T.StructField("application", T.StringType()),
    T.StructField("architecture", T.StringType()),
])

df.select(F.from_json("dimension", dimension_type).alias("dimension")).select("dimension.*").show(vertical=True, n=1)

-RECORD 0----------------------------
 os           | Windows_NT           
 child        | false                
 label        |                      
 metric       | A11Y_INSTANTIATED... 
 osVersion    | 10.0                 
 application  | Firefox              
 architecture | x86                  
only showing top 1 row



## Processing histograms

It's useful to note that the last two elements of the aggregate array correspond to `sum` and `count`.

In [4]:
aggregates = df.select("aggregate")
aggregates.cache()
aggregates.count()
aggregates.show(truncate=80)

+--------------------------------------------------------------------------------+
|                                                                       aggregate|
+--------------------------------------------------------------------------------+
|                                                                     {0,2,0,2,2}|
|                                                  {0,0,0,0,0,0,0,0,0,0,0,2,22,2}|
|                                                                     {2,0,0,0,2}|
|                                                                     {2,0,0,0,2}|
|{871471,140673,200275,430614,269254,168218,390196,232243,76503,27335,15955,12...|
|{0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|
|{43944,2046,143,50,15,9,9,15,38,14,6,2,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...|
|{0,0,0,0,1,2,0,0,0,0,0,0,0,0,0,2,50,1,2,0,23,0,106,31,15,5,4,6,4,4,4,5,2,0,0,...|
|{0,0,3,2,2,52,44,12,6,3,1,3,2,2,13,61,39,17,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0...|
|   

### Initial approach using a python udf

In [5]:
import ast

aggregate_schema = T.ArrayType(T.IntegerType())

@F.udf(aggregate_schema)
def from_pg_array_py(arr):
    if not arr:
        return None
    # replace curly braces with brackets, then use a safe literal eval
    return ast.literal_eval("[" + arr[1:-1] + "]")

aggregates.select(from_pg_array_py("aggregate").alias("aggregate")).show(truncate=80)

+--------------------------------------------------------------------------------+
|                                                                       aggregate|
+--------------------------------------------------------------------------------+
|                                                                 [0, 2, 0, 2, 2]|
|                                     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|
|                                                                 [2, 0, 0, 0, 2]|
|                                                                 [2, 0, 0, 0, 2]|
|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|
|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|
|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|
|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|
|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|
|   

### Alternative implementation using `from_json`

In [6]:
def from_pg_array(arr):
    return F.from_json(F.translate(F.translate(arr, "{", "["), "}", "]"), aggregate_schema).alias(arr)

aggregates.select(from_pg_array("aggregate")).show(truncate=80)

+--------------------------------------------------------------------------------+
|                                                                       aggregate|
+--------------------------------------------------------------------------------+
|                                                                 [0, 2, 0, 2, 2]|
|                                     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 22, 2]|
|                                                                 [2, 0, 0, 0, 2]|
|                                                                 [2, 0, 0, 0, 2]|
|[871471, 140673, 200275, 430614, 269254, 168218, 390196, 232243, 76503, 27335...|
|[0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|
|[43944, 2046, 143, 50, 15, 9, 9, 15, 38, 14, 6, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0,...|
|[0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 50, 1, 2, 0, 23, 0, 106, 31,...|
|[0, 0, 3, 2, 2, 52, 44, 12, 6, 3, 1, 3, 2, 2, 13, 61, 39, 17, 2, 2, 2, 0, 0, ...|
|   

### Test the speed of the two implementations

The `from_json` method performs significantly better than the python udf, by a factor of ~8x.

In [7]:
%time aggregates.select(from_pg_array_py("aggregate").alias("aggregate")).repartition(1).write.parquet("/tmp/agg-py", mode="overwrite")
%time aggregates.select(from_pg_array("aggregate")).repartition(1).write.parquet("/tmp/agg-sql", mode="overwrite")

CPU times: user 12.3 ms, sys: 5.55 ms, total: 17.9 ms
Wall time: 4min 5s
CPU times: user 4.1 ms, sys: 1.05 ms, total: 5.15 ms
Wall time: 28.5 s


### Test the relative size of these two approaches with compression

In [8]:
%time aggregates.repartition(1).write.parquet("/tmp/agg-string", mode="overwrite")
%time aggregates.select(from_pg_array("aggregate")).repartition(1).write.parquet("/tmp/agg-list", mode="overwrite")

CPU times: user 1.05 ms, sys: 881 Âµs, total: 1.93 ms
Wall time: 3.78 s
CPU times: user 4.47 ms, sys: 1.2 ms, total: 5.67 ms
Wall time: 28.3 s


In [9]:
! du -h /tmp/agg-string
! du -h /tmp/agg-list

145M	/tmp/agg-string
129M	/tmp/agg-list


### Without compression

Without compression, the binary format wins by a small margin. This matters, since BigQuery will store all the rows uncompressed.

In [10]:
%time aggregates.repartition(1).write.parquet("/tmp/agg-string", mode="overwrite", compression="none")
%time aggregates.select(from_pg_array("aggregate")).write.parquet("/tmp/agg-list", mode="overwrite", compression="none")

CPU times: user 1.52 ms, sys: 1.22 ms, total: 2.74 ms
Wall time: 3.53 s
CPU times: user 5.03 ms, sys: 1.38 ms, total: 6.41 ms
Wall time: 23.1 s


In [11]:
! du -h /tmp/agg-string
! du -h /tmp/agg-list

469M	/tmp/agg-string
372M	/tmp/agg-list


## Parsing table name

We don't need to keep the table name around, since we have all the information to reconstruct it.

In [12]:
(
    df.select(F.split("table_name", "_").alias("parts"))
    .select(F.col("parts").getItem(2).alias("channel"), F.col("parts").getItem(3).alias("version"))
    .show()
)

+-------+-------+
|channel|version|
+-------+-------+
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
|nightly|     43|
+-------+-------+
only showing top 20 rows



## Putting it all together

In [13]:
input_dir = f"../data/parquet/submission/20191201"
df = spark.read.parquet(input_dir)

result = (
    df
    .withColumn("parts", F.split("table_name", "_"))
    .withColumn("dimension", F.from_json("dimension", dimension_type))
    .select(
        "aggregate_type",
        F.col("parts").getItem(2).alias("channel"), 
        F.col("parts").getItem(3).alias("version"),
        "ds_nodash",
        "dimension.*",
        from_pg_array("aggregate"),
    )
)

result.show(vertical=True, n=3, truncate=80)

-RECORD 0-----------------------------------------------------
 aggregate_type | submission                                  
 channel        | nightly                                     
 version        | 43                                          
 ds_nodash      | 20191201                                    
 os             | Windows_NT                                  
 child          | false                                       
 label          |                                             
 metric         | A11Y_INSTANTIATED_FLAG                      
 osVersion      | 10.0                                        
 application    | Firefox                                     
 architecture   | x86                                         
 aggregate      | [0, 2, 0, 2, 2]                             
-RECORD 1-----------------------------------------------------
 aggregate_type | submission                                  
 channel        | nightly                              