def run_ml_state_machine()

in afa/app/app.py [0:0]


def run_ml_state_machine():
    """Execute the Amazon Forecast state machine.

    """
    PD_TIMESTAMP_FMT = "%Y-%m-%d"
    AFC_TIMESTAMP_FMT = "yyyy-MM-dd"
    AFC_FORECAST_HORIZON = state.report["afc"]["horiz"]
    AFC_FORECAST_FREQUENCY = state.report["afc"]["freq"]

    df = state.report["data"].get("df", None)
    fn = state.report["data"]["path"]

    assert(df is not None)

    data_freq = state.report["data"]["freq"]

    if data_freq in ("D",):
        pass
    elif data_freq in ("W", "W-MON",):
        data_freq = "W"
    elif data_freq in ("M", "MS",):
        data_freq = "M"
    else:
        raise NotImplementedError

    # state.df is already resampled to same frequency as the forecast freq.
    state_machine_arn = None

    # generate a unique prefix for the Amazon Forecast resources
    now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    prefix = f"AfaAfc{now_str}"

    # get the state machine arn and s3 paths
    ssm_client = boto3.client("ssm")
    state_machine_arn = \
        ssm_client.get_parameter(Name="AfaAfcStateMachineArn")["Parameter"]["Value"]
    s3_input_path = \
        ssm_client.get_parameter(Name="AfaS3InputPath")["Parameter"]["Value"].rstrip("/")
    s3_output_path = \
        ssm_client.get_parameter(Name="AfaS3OutputPath")["Parameter"]["Value"].rstrip("/")

    # generate amazon forecast compatible data
    with st.spinner("Launching Amazon Forecast job ..."):
        df_afc = df \
            | px.reset_index() \
            | px.rename({"index": "timestamp"}, axis=1) \
            | px.assign(item_id=px["channel"] + "@@" + px["family"] + "@@" + px["item_id"]) \
            | px[["timestamp", "demand", "item_id"]] \
            | px.sort_values(by=["item_id", "timestamp"])

        df_afc["timestamp"] = \
            pd.DatetimeIndex(df_afc["timestamp"]).strftime("%Y-%m-%d") 
        afc_input_fn = \
            re.sub("(.csv.gz)", ".csv", os.path.basename(fn))
        s3_input_path = f"{s3_input_path}/{afc_input_fn}"

        # upload the input csv to s3
        wr.s3.to_csv(df_afc, s3_input_path, index=False)

        # upload local re-sampled csv file to s3 input path
        client = boto3.client("stepfunctions")

        resp = client.start_execution(
            stateMachineArn=state_machine_arn,
            input=json.dumps({
                "prefix": prefix,
                "data_freq": data_freq,
                "horiz": AFC_FORECAST_HORIZON,
                "freq": AFC_FORECAST_FREQUENCY,
                "s3_path": s3_input_path,
                "s3_export_path": s3_output_path
            })
        )

    status_json_s3_path = \
        os.path.join(s3_output_path, f'{prefix}_status.json')

    return resp["executionArn"], prefix, status_json_s3_path