def aggregate_forecast_data()

in source/glue/jobs/forecast_etl.py [0:0]


    def aggregate_forecast_data(self):
        """
        Aggregate all of the forecast data (currently TARGET_TIME_SERIES, PREDICTOR_BACKTEST_EXPORT_JOB,
        FORECAST_EXPORT_JOB, ITEM_METADATA) into a consistent schema for future consumption by Athena
        :return: DynamicFrame representing the consolidated/ aggregated forecast input / output data
        """
        output_schema = ForecastStatus.empty()
        input = self.target_time_series_data
        export = self.forecast_export_job_data
        backtest = self.predictor_backtest_export_job_data

        # apply dimensions to input, export, backtest
        tts_fields = self.target_time_series_schema.fields
        try:
            md_fields = self.item_metadata_schema.fields
        except AttributeError:
            md_fields = []
        attrs = input.map_generic_attribute_names(tts_fields, md_fields)
        attrs = export.map_generic_attribute_names(
            tts_fields, md_fields, attributes=attrs
        )
        attrs = backtest.map_generic_attribute_names(
            tts_fields, md_fields, attributes=attrs
        )

        # drop metadata (will be joined later)
        input.drop_metadata_fields()
        export.drop_metadata_fields()
        backtest.drop_metadata_fields()

        # filter the backtest data out of the input data
        earliest_backtest_data = (
            backtest.df.toDF().select(F.min("timestamp").alias("min")).head()["min"]
        )
        logger.info(
            "taking input TARGET_TIME_SERIES up to %s" % str(earliest_backtest_data)
        )
        filtered_input = input.df.toDF()
        filtered_input = filtered_input.where(
            filtered_input["timestamp"] < earliest_backtest_data
        )

        # combine the data with a union
        aggregate = ForecastStatus.union_dfs(filtered_input, backtest.df.toDF())
        aggregate = ForecastStatus.union_dfs(aggregate, export.df.toDF())

        # add metadata via a join if metadata is available
        try:
            metadata = self.item_metadata_data
            metadata.map_generic_attribute_names(tts_fields, md_fields, attrs)
            metadata_df = metadata.df.toDF()
            aggregate = aggregate.join(metadata_df, ["identifier"], "left")
        except AttributeError:
            logger.info(f"metadata not available to join for {self.name}")

        # prepare the output column format/ order
        aggregate = ForecastStatus.union_dfs(output_schema, aggregate)

        # add the month starting data (this is the partition)
        aggregate = aggregate.withColumn(
            "month_starting",
            F.date_format(F.date_trunc("month", "timestamp"), "y-MM-dd"),
        )

        aggregate_dynamic_frame = DynamicFrame.fromDF(
            aggregate, input.gc, "AGGREGATE_FORECAST"
        )
        return aggregate_dynamic_frame