in assets/functions/ml_pipeline/prepare_training/app.py [0:0]
def lambda_handler(event, context):
training_samples = event['Training_samples']
data_start = event['Data_start']
data_end = event['Data_end']
forecast_period = event['Forecast_period']
prediction_length = forecast_period * 24
meter_samples = get_meters(ATHENA_CONNECTION, training_samples, DB_SCHEMA)
q = '''
select date_trunc('HOUR', reading_date_time) as datetime, meter_id, sum(reading_value) as consumption
from "{}".daily
where meter_id in ('{}')
and reading_type = 'INT'
and reading_date_time >= timestamp '{}'
and reading_date_time < timestamp '{}'
group by 2, 1
'''.format(DB_SCHEMA, "','".join(meter_samples), data_start, data_end)
result = pd.read_sql(q, ATHENA_CONNECTION)
result = result.set_index('datetime')
timeseries = {}
for meter_id in meter_samples:
data_kw = result[result['meter_id'] == meter_id].resample('1H').sum()
timeseries[meter_id] = data_kw.iloc[:, 0] # np.trim_zeros(data_kw.iloc[:,0], trim='f')
freq = 'H'
num_test_windows = 2
start_dataset = pd.Timestamp(data_start, freq=freq)
end_dataset = pd.Timestamp(data_end, freq=freq) - pd.Timedelta(1, unit='H')
end_training = end_dataset - pd.Timedelta(prediction_length * num_test_windows, unit='H')
if USE_WEATHER_DATA == 1:
df_weather = get_weather(ATHENA_CONNECTION, data_start, DB_SCHEMA)
training_data = [
{
"start": str(start_dataset),
"target": ts[start_dataset:end_training].tolist(),
# We use -1, because pandas indexing includes the upper bound
"dynamic_feat": [df_weather['temperature'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size - 1,
unit='H')].tolist(),
df_weather['humidity'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size - 1,
unit='H')].tolist(),
df_weather['apparenttemperature'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_training].size - 1,
unit='H')].tolist()]
}
for meterid, ts in timeseries.items()
]
# there could be missing data, so use actual timeseries size
testing_data = [
{
"start": str(start_dataset),
"target": ts[start_dataset:end_dataset].tolist(),
"dynamic_feat": [df_weather['temperature'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size - 1,
unit='H')].tolist(),
df_weather['humidity'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size - 1,
unit='H')].tolist(),
df_weather['apparenttemperature'][
start_dataset:start_dataset + pd.Timedelta(ts[start_dataset:end_dataset].size - 1,
unit='H')].tolist()]
}
for k in range(1, num_test_windows + 1)
for meterid, ts in timeseries.items()
]
else:
training_data = [
{
"start": str(start_dataset),
"target": ts[start_dataset:end_training].tolist()
# We use -1, because pandas indexing includes the upper bound
}
for meterid, ts in timeseries.items()
]
testing_data = [
{
"start": str(start_dataset),
"target": ts[start_dataset:end_dataset].tolist()
}
for k in range(1, num_test_windows + 1)
for meterid, ts in timeseries.items()
]
write_upload_file(S3_BUCKET, 'meteranalytics/train/training.json', training_data)
write_upload_file(S3_BUCKET, 'meteranalytics/test/testing.json', testing_data)
write_json_to_file(S3_BUCKET, 'meteranalytics/initial_pass', event)