in notebook/utils/wf.py [0:0]
def run(self, execution_arn, n_stages) :
lookup= {}
completed = 0
main_pb = trange(n_stages+1, desc="Workflow Initiated")
start = time()
while True :
status = self.sfn.describe_execution(executionArn=execution_arn)["status"]
if self._wf_failed(status) :
main_pb.leave=True
raise Exception(f"Workflow execution {status}.")
exec_hist = self.sfn.get_execution_history(
executionArn=execution_arn,
maxResults=SFNMonitor.PAGINATION_SIZE,
reverseOrder=False,
includeExecutionData=False)
events = exec_hist["events"]
while True :
for e in events :
if "stateEnteredEventDetails" in e :
state = e["stateEnteredEventDetails"]["name"]
transition= e["type"]
#print(f"state entered: {state} status: {transition}")
if state not in lookup :
lookup[state] = 1
main_pb.desc = f"Currently in the workflow stage: {state}"
if transition == "ParallelStateEntered" :
ntasks = self._monitor_parallel_process(execution_arn, lookup, main_pb)
completed += ntasks
if "stateExitedEventDetails" in e :
state = e["stateExitedEventDetails"]["name"]
transition = e["type"]
#print(f"state exited: {state} status: {transition}")
assert (state in lookup)
if lookup[state] :
if transition != "ParallelStateExited" :
self._update_progress(main_pb)
lookup[state] = 0
completed+=1
else :
assert not lookup[state]
if "nextToken" not in exec_hist :
break
next_token = exec_hist["nextToken"]
exec_hist = self.sfn.get_execution_history(
executionArn=execution_arn,
maxResults=SFNMonitor.PAGINATION_SIZE,
reverseOrder=False,
includeExecutionData=False,
nextToken = next_token)
if time() - start > self.timeout :
main_pb.leave = True
break
if completed >= n_stages :
main_pb.desc = f"Workflow Completed"
self._update_progress(main_pb)
break
else :
sleep(self.n_wait)