def lambda_handler()

in assets/functions/prepare_training/app.py [0:0]


def lambda_handler(event, context):
    ATHENA_OUTPUT_BUCKET = os.environ['Athena_bucket']
    S3_BUCKET = os.environ['Working_bucket']
    DB_SCHEMA = os.environ['Db_schema']
    USE_WEATHER_DATA = os.environ['With_weather_data']

    TRAINING_SAMPLES = event['Training_samples']
    DATA_START = event['Data_start']
    DATA_END = event['Data_end']
    FORECAST_PERIOD = event['Forecast_period']
    prediction_length = FORECAST_PERIOD * 24

    connection = connect(s3_staging_dir='s3://{}/'.format(ATHENA_OUTPUT_BUCKET), region_name=REGION)

    meter_samples = get_meters(connection, TRAINING_SAMPLES, DB_SCHEMA)

    q = '''
            select date_trunc('HOUR', reading_date_time) as datetime, meter_id, sum(reading_value) as consumption
                from "{}".daily
                where meter_id in ('{}')
                and reading_date_time >= timestamp '{}'
                and reading_date_time < timestamp '{}'
                group by 2, 1
        '''.format(DB_SCHEMA, "','".join(meter_samples), DATA_START, DATA_END)

    result = pd.read_sql(q, connection)
    result = result.set_index('datetime')

    timeseries = {}
    for meter_id in meter_samples:
        data_kw = result[result['meter_id'] == meter_id].resample('1H').sum()
        timeseries[meter_id] = data_kw.iloc[:,0]  #np.trim_zeros(data_kw.iloc[:,0], trim='f')

    freq = 'H'
    num_test_windows = 2
    start_dataset = pd.Timestamp(DATA_START, freq=freq)
    end_dataset = pd.Timestamp(DATA_END, freq=freq) - pd.Timedelta(1, unit='H')
    end_training = end_dataset - pd.Timedelta(prediction_length * num_test_windows, unit='H')

    if USE_WEATHER_DATA == 1:
        df_weather = get_weather(connection, DATA_START, DB_SCHEMA)

        training_data = [
            {
                "start": str(start_dataset),
                "target": ts[start_dataset:end_training].tolist(),  # We use -1, because pandas indexing includes the upper bound
                "dynamic_feat": [df_weather['temperature'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size-1, unit='H')].tolist(),
                                 df_weather['humidity'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size-1, unit='H')].tolist(),
                                 df_weather['apparenttemperature'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size-1, unit='H')].tolist()]
            }
            for meterid, ts in timeseries.items()
        ]

        # there could be missing data, so use actual timeseries size
        testing_data = [
            {
                "start": str(start_dataset),
                "target": ts[start_dataset:end_dataset].tolist(),
                "dynamic_feat": [df_weather['temperature'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size-1, unit='H')].tolist(),
                                 df_weather['humidity'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size-1, unit='H')].tolist(),
                                 df_weather['apparenttemperature'][start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size-1, unit='H')].tolist()]
            }
            for k in range(1, num_test_windows + 1)
            for meterid, ts in timeseries.items()
        ]
    else:
        training_data = [
            {
                "start": str(start_dataset),
                "target": ts[start_dataset:end_training].tolist()  # We use -1, because pandas indexing includes the upper bound
            }
            for meterid, ts in timeseries.items()
        ]

        testing_data = [
            {
                "start": str(start_dataset),
                "target": ts[start_dataset:end_dataset].tolist()
            }
            for k in range(1, num_test_windows + 1)
            for meterid, ts in timeseries.items()
        ]

    write_upload_file(S3_BUCKET, 'meteranalytics/train/training.json', training_data)
    write_upload_file(S3_BUCKET, 'meteranalytics/test/testing.json', testing_data)

    write_json_to_file(S3_BUCKET, 'meteranalytics/initial_pass', event)