in source/glue/jobs/forecast_etl.py [0:0]
def aggregate_forecast_data(self):
"""
Aggregate all of the forecast data (currently TARGET_TIME_SERIES, PREDICTOR_BACKTEST_EXPORT_JOB,
FORECAST_EXPORT_JOB, ITEM_METADATA) into a consistent schema for future consumption by Athena
:return: DynamicFrame representing the consolidated/ aggregated forecast input / output data
"""
output_schema = ForecastStatus.empty()
input = self.target_time_series_data
export = self.forecast_export_job_data
backtest = self.predictor_backtest_export_job_data
# apply dimensions to input, export, backtest
tts_fields = self.target_time_series_schema.fields
try:
md_fields = self.item_metadata_schema.fields
except AttributeError:
md_fields = []
attrs = input.map_generic_attribute_names(tts_fields, md_fields)
attrs = export.map_generic_attribute_names(
tts_fields, md_fields, attributes=attrs
)
attrs = backtest.map_generic_attribute_names(
tts_fields, md_fields, attributes=attrs
)
# drop metadata (will be joined later)
input.drop_metadata_fields()
export.drop_metadata_fields()
backtest.drop_metadata_fields()
# filter the backtest data out of the input data
earliest_backtest_data = (
backtest.df.toDF().select(F.min("timestamp").alias("min")).head()["min"]
)
logger.info(
"taking input TARGET_TIME_SERIES up to %s" % str(earliest_backtest_data)
)
filtered_input = input.df.toDF()
filtered_input = filtered_input.where(
filtered_input["timestamp"] < earliest_backtest_data
)
# combine the data with a union
aggregate = ForecastStatus.union_dfs(filtered_input, backtest.df.toDF())
aggregate = ForecastStatus.union_dfs(aggregate, export.df.toDF())
# add metadata via a join if metadata is available
try:
metadata = self.item_metadata_data
metadata.map_generic_attribute_names(tts_fields, md_fields, attrs)
metadata_df = metadata.df.toDF()
aggregate = aggregate.join(metadata_df, ["identifier"], "left")
except AttributeError:
logger.info(f"metadata not available to join for {self.name}")
# prepare the output column format/ order
aggregate = ForecastStatus.union_dfs(output_schema, aggregate)
# add the month starting data (this is the partition)
aggregate = aggregate.withColumn(
"month_starting",
F.date_format(F.date_trunc("month", "timestamp"), "y-MM-dd"),
)
aggregate_dynamic_frame = DynamicFrame.fromDF(
aggregate, input.gc, "AGGREGATE_FORECAST"
)
return aggregate_dynamic_frame