def _run_in_parallel()

in odps/df/backends/core.py [0:0]


    def _run_in_parallel(self, ui, n_parallel, wait=True, timeout=None, progress_proportion=1.0):
        submits_lock = threading.RLock()
        submits = dict()
        user_wait = dict()
        result_wait = dict()
        results = dict()

        curr_progress = ui.current_progress() or 0

        def actual_run(dag=None, is_fallback=False):
            dag = dag or self
            calls = dag.topological_sort()
            result_calls = sorted([c for c in calls if c.result_index is not None],
                                  key=lambda x: x.result_index)
            fallback = threading.Event()

            if is_fallback:
                ui.update(curr_progress)

            def close_ui(*_):
                with submits_lock:
                    if all(call in submits and call in results for call in result_calls):
                        ui.close()

            executor = futures.ThreadPoolExecutor(max_workers=n_parallel)

            for call in calls:
                if call.result_index is not None and is_fallback:
                    # if is fallback, we do not create new future
                    # cause the future objects have been passed to user
                    future = result_wait[call.result_index]
                else:
                    future = futures.Future()
                user_wait[call] = future
                if call.result_index is not None:
                    future.add_done_callback(close_ui)
                    if not is_fallback:
                        result_wait[call.result_index] = future

            for call in calls:
                def run(func):
                    try:
                        if fallback.is_set():
                            raise DagDependencyError('Node execution failed due to callback')

                        if call.result_index is None or not is_fallback:
                            user_wait[func].set_running_or_notify_cancel()

                        prevs = dag.predecessors(func)
                        if prevs:
                            fs = [user_wait[p] for p in prevs]
                            for f in fs:
                                if f.exception():
                                    raise DagDependencyError('Node execution failed due to failure of '
                                                             'previous node, exception: %s' % f.exception())

                        res = func(ui=ui, progress_proportion=progress_proportion / len(calls))
                        results[func] = res
                        user_wait[func].set_result(res)
                        return res
                    except:
                        e, tb = sys.exc_info()[1:]
                        if not is_fallback and self._can_fallback() and self._need_fallback(e):
                            if not fallback.is_set():
                                fallback.set()
                                new_dag = dag.fallback()
                                actual_run(new_dag, True)
                        if not fallback.is_set():
                            results[func] = (e, tb)
                            if six.PY2:
                                user_wait[func].set_exception_info(e, tb)
                            else:
                                user_wait[func].set_exception(e)
                        raise
                    finally:
                        with submits_lock:
                            for f in dag.successors(func):
                                if f in submits:
                                    continue
                                prevs = dag.predecessors(f)
                                if all(p in submits and user_wait[p].done() for p in prevs):
                                    submits[f] = executor.submit(run, f)

                if not dag.predecessors(call):
                    with submits_lock:
                        submits[call] = executor.submit(run, call)

            if wait:
                dones, _ = futures.wait(user_wait.values())
                for done in dones:
                    done.result()
                return [results[c] for c in
                        sorted([c for c in calls if c.result_index is not None],
                               key=lambda x: x.result_index)]

            if timeout:
                futures.wait(user_wait.values(), timeout=timeout)

        actual_run()
        if wait:
            return [it[1].result() for it in sorted(result_wait.items(), key=itemgetter(0))]
        else:
            return [it[1] for it in sorted(result_wait.items(), key=itemgetter(0))]