in assets/functions/ml_pipeline/batch_anomaly_detection/app.py [0:0]
def process_batch(meter_start, meter_end, data_end, db_schema, connection):
query = '''select meter_id, max(ds) as ds from "{}".anomaly
where meter_id between '{}' and '{}' group by 1;
'''.format(db_schema, meter_start, meter_end)
df_anomaly = pd.read_sql(query, connection)
anomaly_meters = df_anomaly.meter_id.tolist()
df_timeseries = get_batch_data(meter_start, meter_end, data_end, db_schema, connection)
meters = df_timeseries.meter_id.unique()
column_list = ['meter_id', 'ds', 'consumption', 'yhat_lower', 'yhat_upper', 'anomaly', 'importance']
df_result = pd.DataFrame(columns=column_list)
for meter in meters:
df_meter = df_timeseries[df_timeseries.meter_id == meter]
# Run anomaly detection only if it's not done before or there are new data added
if meter not in anomaly_meters:
print("process anomaly for meter", meter)
df_forecast = fit_predict_model(meter, df_meter)
df_result = df_result.append(df_forecast[column_list], ignore_index=True)
else:
latest = pd.to_datetime(df_anomaly[df_anomaly.meter_id == meter]['ds'].iloc[0])
if df_meter.ds.max() > latest:
print("process anomaly for meter {} from {}".format(meter, latest))
df_forecast = fit_predict_model(meter, df_meter)
df_new_anomaly = df_forecast[df_forecast.ds > latest]
df_result = df_result.append(df_new_anomaly[column_list], ignore_index=True)
else:
print("skip meter", meter)
return df_result