def aggregate_and_write_data_to_s3()

in assets/glue/scripts/business_aggregate_weekly.py [0:0]


def aggregate_and_write_data_to_s3(bucket_path, push_down_predicate=""):
    """
        If provided, takes a push down predicate to select exactly the data that are needed to be aggregated.
        Otherwise the whole data set gets aggregated

    :param bucket_path:
    :param push_down_predicate:
    """
    meter_data_to_aggregate = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'], \
                                                                            table_name="daily", \
                                                                            transformation_ctx="meter_data_to_aggregate", \
                                                                            push_down_predicate=push_down_predicate)

    daily_aggregated_interval_reads = meter_data_to_aggregate.toDF() \
        .groupby('meter_id', 'week_of_year', 'month', 'year') \
        .agg(sum("reading_value").alias("aggregated_consumption"))

    daily_aggregated_interval_reads \
        .repartition("year", "week_of_year") \
        .write \
        .mode("overwrite") \
        .option("compression", "snappy") \
        .partitionBy("year", "week_of_year") \
        .parquet(bucket_path)