in assets/functions/meter_forecast/app.py [0:0]
def lambda_handler(event, context):
pathParameter = event["pathParameters"]
queryParameter = event["queryStringParameters"]
if ("meter_id" not in pathParameter) \
or ("data_start" not in queryParameter) \
or ("data_end" not in queryParameter):
return {
'statusCode': 400,
'body': "error: meter_id, data_start, and data_end needs to be provided."
}
meter_id = pathParameter['meter_id']
ml_endpoint_name = get_config("ML_endpoint_name")
data_start = queryParameter['data_start']
data_end = queryParameter['data_end']
connection = connect(s3_staging_dir='s3://{}/'.format(ATHENA_OUTPUT_BUCKET), region_name=REGION)
query = '''select date_trunc('HOUR', reading_date_time) as datetime, sum(reading_value) as consumption
from "{}".daily
where meter_id = '{}' and reading_date_time >= timestamp '{}'
and reading_date_time < timestamp '{}'
and reading_type = 'INT'
group by 1;
'''.format(DB_SCHEMA, meter_id, data_start, data_end)
result = pd.read_sql(query, connection)
result = result.set_index('datetime')
if result.empty:
# if data frame is empty, return empty object.
return {
"statusCode": 200,
"body": '{"consumption":{}}'
}
data_kw = result.resample('1H').sum()
timeseries = data_kw.iloc[:, 0] # np.trim_zeros(data_kw.iloc[:,0], trim='f')
freq = 'H'
df_weather = None
if USE_WEATHER_DATA == 1:
df_weather = get_weather(connection, data_start, DB_SCHEMA)
response = SAGEMAKER.invoke_endpoint(EndpointName=ml_endpoint_name,
ContentType='application/json',
Body=encode_request(timeseries[:], df_weather))
prediction_time = timeseries.index[-1] + pd.Timedelta(1, unit='H')
df_prediction = decode_response(response['Body'].read(), freq, prediction_time)
df_prediction.columns = ['consumption']
prediction_result = df_prediction.to_json()
return {
"statusCode": 200,
"body": prediction_result
}