in assets/glue/scripts/business_aggregate_monthly.py [0:0]
def aggregate_and_write_data_to_s3(bucket_path, push_down_predicate=""):
"""
If provided, takes a push down predicate to select exactly the data that are needed to be aggregated.
Otherwise the whole data set gets aggregated
:param bucket_path:
:param push_down_predicate:
"""
meter_data_to_aggregate = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'], \
table_name="daily", \
transformation_ctx="meter_data_to_aggregate", \
push_down_predicate=push_down_predicate)
daily_aggregated_interval_reads = meter_data_to_aggregate.toDF() \
.groupby('meter_id', 'month', 'year') \
.agg(sum("reading_value").alias("aggregated_consumption"))
daily_aggregated_interval_reads \
.repartition("year", "month") \
.write \
.mode("overwrite") \
.option("compression", "snappy") \
.partitionBy("year", "month") \
.parquet(bucket_path)