in assets/functions/ml_pipeline/upload_result/app.py [0:0]
def lambda_handler(event, context):
batch_start = event['Batch_start']
batch_end = event['Batch_end']
forecast_start = event['Data_end']
forecast_period = event['Forecast_period']
prediction_length = forecast_period * 24
output = 'meteranalytics/inference/batch_%s_%s/batch.json.out' % (batch_start, batch_end)
S3.Bucket(S3_BUCKET).Object(output).download_file('/tmp/batch.out.json')
freq = 'H'
prediction_time = pd.Timestamp(forecast_start, freq=freq)
prediction_index = pd.date_range(start=prediction_time,
end=prediction_time + pd.Timedelta(prediction_length - 1, unit='H'), freq=freq)
dict_of_samples = {}
meterids = get_meters(ATHENA_CONNECTION, batch_start, batch_end, DB_SCHEMA)
results = pd.DataFrame(columns=['meterid', 'datetime', 'kwh'])
i = 0
with open('/tmp/batch.out.json') as fp:
for line in fp:
df = pd.DataFrame(data={**json.loads(line)['quantiles'],
**dict_of_samples}, index=prediction_index)
dataframe = pd.DataFrame({'meter_id': np.array([meterids[i] for x in range(df['0.9'].count())]),
'datetime': df.index.values,
'consumption': df['0.9'].values})
i = i + 1
results = results.append(dataframe)
results.to_csv('/tmp/forecast.csv', index=False)
S3.Bucket(S3_BUCKET).Object(os.path.join('meteranalytics',
'forecast/{}/batch_{}_{}.csv'.format(
forecast_start, batch_start,
batch_end))).upload_file(
'/tmp/forecast.csv')