in src/lookoutequipment/scheduler.py [0:0]
def list_inference_executions(self,
execution_status=None,
start_time=None,
end_time=None,
max_results=50):
"""
This method lists all the past inference execution triggered by the
current scheduler.
Parameters:
execution_status (string):
Only keep the executions with a given status (default: None)
start_time (pandas.DateTime):
Filters out the executions that happened before start_time
(default: None)
end_time (pandas.DateTime):
Filters out the executions that happened after end_time
(default: None)
max_results (integer):
Max number of results you want to get out of this method
(default: 50)
Returns:
list of dict:
A list of all past inference executions, with each inference
attributes stored in a python dictionary
"""
# Built the execution request object:
list_executions_request = {"MaxResults": max_results}
list_executions_request["InferenceSchedulerName"] = self.scheduler_name
if execution_status is not None:
list_executions_request["Status"] = execution_status
if start_time is not None:
list_executions_request['DataStartTimeAfter'] = start_time
if end_time is not None:
list_executions_request['DataEndTimeBefore'] = end_time
# Loops through all the inference executed by the current scheduler:
has_more_records = True
list_executions = []
while has_more_records:
list_executions_response = self.client.list_inference_executions(
**list_executions_request
)
if "NextToken" in list_executions_response:
list_executions_request["NextToken"] = list_executions_response["NextToken"]
else:
has_more_records = False
list_executions = list_executions + \
list_executions_response["InferenceExecutionSummaries"]
# Filter the executions based on their status:
if execution_status is not None:
filtered_list_executions = []
for execution in list_executions:
if execution['Status'] == execution_status:
filtered_list_executions.append(execution)
list_executions = filtered_list_executions
# Returns all the summaries in a list:
self.execution_summaries = list_executions
return list_executions