in assets/functions/meter_forecast/app.py [0:0]
def lambda_handler(event, context):
ATHENA_OUTPUT_BUCKET = os.environ['Athena_bucket']
DB_SCHEMA = os.environ['Db_schema']
USE_WEATHER_DATA = os.environ['With_weather_data']
pathParameter = event["pathParameters"]
queryParameter = event["queryStringParameters"]
if ("meter_id" not in pathParameter) \
or ("data_start" not in queryParameter) \
or ("data_end" not in queryParameter):
return {
'statusCode': 500,
'body': "error: meter_id, data_start, and data_end needs to be provided."
}
METER_ID = pathParameter['meter_id']
ML_ENDPOINT_NAME = load_json_from_file(WORKING_BUCKET, "meteranalytics/initial_pass")["ML_endpoint_name"]
DATA_START = queryParameter['data_start'].replace("-", "")
DATA_END = queryParameter['data_end'].replace("-", "")
connection = connect(s3_staging_dir='s3://{}/'.format(ATHENA_OUTPUT_BUCKET), region_name=REGION)
query = '''select date_trunc('HOUR', reading_date_time) as datetime, sum(reading_value) as consumption
from "{}".daily
where meter_id = '{}' and date_str >= '{}'
and date_str < '{}'
group by 1;
'''.format(DB_SCHEMA, METER_ID, DATA_START, DATA_END)
result = pd.read_sql(query, connection)
result = result.set_index('datetime')
data_kw = result.resample('1H').sum()
timeseries = data_kw.iloc[:,0] #np.trim_zeros(data_kw.iloc[:,0], trim='f')
freq = 'H'
df_weather = None
if USE_WEATHER_DATA == 1:
df_weather = get_weather(connection, DATA_START, DB_SCHEMA)
runtime= boto3.client('runtime.sagemaker')
response = runtime.invoke_endpoint(EndpointName=ML_ENDPOINT_NAME,
ContentType='application/json',
Body=encode_request(timeseries[:], df_weather))
prediction_time = timeseries.index[-1] + pd.Timedelta(1, unit='H')
df_prediction = decode_response(response['Body'].read(), freq, prediction_time)
df_prediction.columns = ['consumption']
prediction_result = df_prediction.to_json()
return {
"statusCode": 200,
"body": prediction_result
}