in lab/03-Package-Deploy/iot-jobs/app/windfarm.py [0:0]
def __detect_anomalies__(self):
"""
Keeps processing the data collected from the turbines
and do anomaly detection. It reports to each turbine the
anomalies detected (through a callback)
"""
while self.running:
# for each turbine, check the buffer
start_time = time.time()
for idx in range(self.n_turbines):
if self.simulator.is_turbine_running(idx):
buffer = self.simulator.get_raw_data(idx)
if len(buffer) >= self.min_num_samples:
self.simulator.update_dashboard(idx, np.array(buffer))
# create a copy & prep the data
data = self.__data_prep__(idx, np.array(buffer) )
if not self.edge_agents[idx].is_model_loaded(self.model_meta[idx]['model_name']):
self.simulator.update_label(idx, 'Model not loaded')
continue
# denoise
data = np.array([self.__wavelet_denoise__(data[:,i], 'db6', self.raw_std[i]) for i in range(self.n_features)])
data = data.transpose((1,0))
# normalize
data -= self.mean
data /= self.std
data = data[-(self.TIME_STEPS+self.STEP):]
# create the dataset and reshape it
x = self.__create_dataset__(data, self.TIME_STEPS, self.STEP)
x = np.transpose(x, (0, 2, 1)).reshape(x.shape[0], self.n_features, 10, 10)
# run the model
p = self.edge_agents[idx].predict(self.model_meta[idx]['model_name'], x)
if p is not None:
a = x.reshape(x.shape[0], self.n_features, 100).transpose((0,2,1))
b = p.reshape(p.shape[0], self.n_features, 100).transpose((0,2,1))
# check the anomalies
pred_mae_loss = np.mean(np.abs(b - a), axis=1).transpose((1,0))
values = np.mean(pred_mae_loss, axis=1)
anomalies = (values > self.thresholds)
self.simulator.detected_anomalies(idx, values, anomalies)
elapsed_time = time.time() - start_time
time.sleep(0.5-elapsed_time)