in odps/df/backends/core.py [0:0]
def _run_in_parallel(self, ui, n_parallel, wait=True, timeout=None, progress_proportion=1.0):
submits_lock = threading.RLock()
submits = dict()
user_wait = dict()
result_wait = dict()
results = dict()
curr_progress = ui.current_progress() or 0
def actual_run(dag=None, is_fallback=False):
dag = dag or self
calls = dag.topological_sort()
result_calls = sorted([c for c in calls if c.result_index is not None],
key=lambda x: x.result_index)
fallback = threading.Event()
if is_fallback:
ui.update(curr_progress)
def close_ui(*_):
with submits_lock:
if all(call in submits and call in results for call in result_calls):
ui.close()
executor = futures.ThreadPoolExecutor(max_workers=n_parallel)
for call in calls:
if call.result_index is not None and is_fallback:
# if is fallback, we do not create new future
# cause the future objects have been passed to user
future = result_wait[call.result_index]
else:
future = futures.Future()
user_wait[call] = future
if call.result_index is not None:
future.add_done_callback(close_ui)
if not is_fallback:
result_wait[call.result_index] = future
for call in calls:
def run(func):
try:
if fallback.is_set():
raise DagDependencyError('Node execution failed due to callback')
if call.result_index is None or not is_fallback:
user_wait[func].set_running_or_notify_cancel()
prevs = dag.predecessors(func)
if prevs:
fs = [user_wait[p] for p in prevs]
for f in fs:
if f.exception():
raise DagDependencyError('Node execution failed due to failure of '
'previous node, exception: %s' % f.exception())
res = func(ui=ui, progress_proportion=progress_proportion / len(calls))
results[func] = res
user_wait[func].set_result(res)
return res
except:
e, tb = sys.exc_info()[1:]
if not is_fallback and self._can_fallback() and self._need_fallback(e):
if not fallback.is_set():
fallback.set()
new_dag = dag.fallback()
actual_run(new_dag, True)
if not fallback.is_set():
results[func] = (e, tb)
if six.PY2:
user_wait[func].set_exception_info(e, tb)
else:
user_wait[func].set_exception(e)
raise
finally:
with submits_lock:
for f in dag.successors(func):
if f in submits:
continue
prevs = dag.predecessors(f)
if all(p in submits and user_wait[p].done() for p in prevs):
submits[f] = executor.submit(run, f)
if not dag.predecessors(call):
with submits_lock:
submits[call] = executor.submit(run, call)
if wait:
dones, _ = futures.wait(user_wait.values())
for done in dones:
done.result()
return [results[c] for c in
sorted([c for c in calls if c.result_index is not None],
key=lambda x: x.result_index)]
if timeout:
futures.wait(user_wait.values(), timeout=timeout)
actual_run()
if wait:
return [it[1].result() for it in sorted(result_wait.items(), key=itemgetter(0))]
else:
return [it[1] for it in sorted(result_wait.items(), key=itemgetter(0))]