in afa/app/app.py [0:0]
def run_ml_state_machine():
"""Execute the Amazon Forecast state machine.
"""
PD_TIMESTAMP_FMT = "%Y-%m-%d"
AFC_TIMESTAMP_FMT = "yyyy-MM-dd"
AFC_FORECAST_HORIZON = state.report["afc"]["horiz"]
AFC_FORECAST_FREQUENCY = state.report["afc"]["freq"]
df = state.report["data"].get("df", None)
fn = state.report["data"]["path"]
assert(df is not None)
data_freq = state.report["data"]["freq"]
if data_freq in ("D",):
pass
elif data_freq in ("W", "W-MON",):
data_freq = "W"
elif data_freq in ("M", "MS",):
data_freq = "M"
else:
raise NotImplementedError
# state.df is already resampled to same frequency as the forecast freq.
state_machine_arn = None
# generate a unique prefix for the Amazon Forecast resources
now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
prefix = f"AfaAfc{now_str}"
# get the state machine arn and s3 paths
ssm_client = boto3.client("ssm")
state_machine_arn = \
ssm_client.get_parameter(Name="AfaAfcStateMachineArn")["Parameter"]["Value"]
s3_input_path = \
ssm_client.get_parameter(Name="AfaS3InputPath")["Parameter"]["Value"].rstrip("/")
s3_output_path = \
ssm_client.get_parameter(Name="AfaS3OutputPath")["Parameter"]["Value"].rstrip("/")
# generate amazon forecast compatible data
with st.spinner("Launching Amazon Forecast job ..."):
df_afc = df \
| px.reset_index() \
| px.rename({"index": "timestamp"}, axis=1) \
| px.assign(item_id=px["channel"] + "@@" + px["family"] + "@@" + px["item_id"]) \
| px[["timestamp", "demand", "item_id"]] \
| px.sort_values(by=["item_id", "timestamp"])
df_afc["timestamp"] = \
pd.DatetimeIndex(df_afc["timestamp"]).strftime("%Y-%m-%d")
afc_input_fn = \
re.sub("(.csv.gz)", ".csv", os.path.basename(fn))
s3_input_path = f"{s3_input_path}/{afc_input_fn}"
# upload the input csv to s3
wr.s3.to_csv(df_afc, s3_input_path, index=False)
# upload local re-sampled csv file to s3 input path
client = boto3.client("stepfunctions")
resp = client.start_execution(
stateMachineArn=state_machine_arn,
input=json.dumps({
"prefix": prefix,
"data_freq": data_freq,
"horiz": AFC_FORECAST_HORIZON,
"freq": AFC_FORECAST_FREQUENCY,
"s3_path": s3_input_path,
"s3_export_path": s3_output_path
})
)
status_json_s3_path = \
os.path.join(s3_output_path, f'{prefix}_status.json')
return resp["executionArn"], prefix, status_json_s3_path