def _monitor_parallel_process()

in notebook/utils/wf.py [0:0]


    def _monitor_parallel_process(self, execution_arn, lookup, parent_pb) :
    
        completed = 0

        start = time()
        while True :

            status = self.sfn.describe_execution(executionArn=execution_arn)["status"]
            if self._wf_failed(status) :
                parent_pb.leave=True
                return;

            events = self.sfn.get_execution_history(
                                    executionArn=execution_arn,
                                    maxResults=200,
                                    reverseOrder=False,
                                    includeExecutionData=False)["events"]

            for e in events :

                if "stateEnteredEventDetails" in e :
                    state = e["stateEnteredEventDetails"]["name"]
                    transition= e["type"]

                    #print(f"state entered: {state} status: {transition}")

                    if state not in lookup :

                        if transition != "ParallelStateEntered" :
                            child_pb = trange(1, desc=f">> Parallel Stage: {state}")
                            lookup[state] = child_pb
                        else :
                            lookup[state] = 1
                            ntasks = self._monitor_parallel_process(execution_arn, lookup, parent_pb)
                            completed += ntasks

                if "stateExitedEventDetails" in e :
                    state = e["stateExitedEventDetails"]["name"]
                    transition = e["type"]

                    #print(f"state exited: {state} status: {transition}")

                    assert state in lookup

                    if lookup[state] :
                        completed+=1
                        self._update_progress(parent_pb)

                        if transition != "ParallelStateExited" :
                            self._update_progress(lookup[state])
                            lookup[state] = 0
                        else :
                            lookup[state] = 0
                            return completed

            if time() - start > self.timeout :
                parent_pb.leave = True
                break

            sleep(self.n_wait)

        return completed