in notebook/utils/wf.py [0:0]
def _monitor_parallel_process(self, execution_arn, lookup, parent_pb) :
completed = 0
start = time()
while True :
status = self.sfn.describe_execution(executionArn=execution_arn)["status"]
if self._wf_failed(status) :
parent_pb.leave=True
return;
events = self.sfn.get_execution_history(
executionArn=execution_arn,
maxResults=200,
reverseOrder=False,
includeExecutionData=False)["events"]
for e in events :
if "stateEnteredEventDetails" in e :
state = e["stateEnteredEventDetails"]["name"]
transition= e["type"]
#print(f"state entered: {state} status: {transition}")
if state not in lookup :
if transition != "ParallelStateEntered" :
child_pb = trange(1, desc=f">> Parallel Stage: {state}")
lookup[state] = child_pb
else :
lookup[state] = 1
ntasks = self._monitor_parallel_process(execution_arn, lookup, parent_pb)
completed += ntasks
if "stateExitedEventDetails" in e :
state = e["stateExitedEventDetails"]["name"]
transition = e["type"]
#print(f"state exited: {state} status: {transition}")
assert state in lookup
if lookup[state] :
completed+=1
self._update_progress(parent_pb)
if transition != "ParallelStateExited" :
self._update_progress(lookup[state])
lookup[state] = 0
else :
lookup[state] = 0
return completed
if time() - start > self.timeout :
parent_pb.leave = True
break
sleep(self.n_wait)
return completed