in src/lookoutequipment/scheduler.py [0:0]
def get_predictions(self):
"""
This method loops through all the inference executions and build a
dataframe with all the predictions generated by the model
Returns:
pandas.DataFrame:
A dataframe with one prediction by row (1 for an anomaly or 0
otherwise). Each row is indexed by timestamp.
"""
# Fetch the list of execution summaries in case all executions were not captured yet:
_ = self.list_inference_executions()
fs = s3fs.S3FileSystem()
results_json = []
# Loops through the executions summaries:
for execution_summary in self.execution_summaries:
# We only get an output if the inference execution is a sucess:
status = execution_summary['Status']
if status == 'SUCCESS':
# Build an S3 path for the JSON-line file:
bucket = execution_summary['CustomerResultObject']['Bucket']
key = execution_summary['CustomerResultObject']['Key']
s3_fname = f's3://{bucket}/{key}'
# Opens the file and concatenate the results into a dataframe:
with fs.open(s3_fname, 'r') as f:
content = [eval(line) for line in f.readlines()]
results_json = results_json + content
# Build the final dataframes with all the results:
if len(results_json) > 0:
results_df = pd.DataFrame(results_json)
results_df['timestamp'] = pd.to_datetime(results_df['timestamp'])
results_df = results_df.set_index('timestamp')
results_df = results_df.sort_index()
expanded_results = []
for index, row in results_df.iterrows():
new_row = dict()
new_row.update({'timestamp': index})
new_row.update({'prediction': row['prediction']})
if row['prediction'] == 1:
diagnostics = pd.DataFrame(row['diagnostics'])
diagnostics = dict(zip(diagnostics['name'], diagnostics['value']))
new_row = {**new_row, **diagnostics}
expanded_results.append(new_row)
expanded_results = pd.DataFrame(expanded_results)
expanded_results['timestamp'] = pd.to_datetime(expanded_results['timestamp'])
expanded_results = expanded_results.set_index('timestamp')
expanded_results.head()
return expanded_results
else:
raise Exception('No successful execution found for this scheduler')