in odps/df/delay.py [0:0]
def execute(self, ui=None, async_=False, n_parallel=1, timeout=None,
close_and_notify=True):
if self._running:
raise RuntimeError('Cannot execute on an executing delay object.')
with self._lock:
_idx_calls = sorted(six.iteritems(self._calls), key=lambda p: p[0])
_indices = [p[0] for p in _idx_calls]
_calls = [p[1] for p in _idx_calls]
_futures = [p[1] for p in sorted(six.iteritems(self._futures), key=lambda p: p[0])]
from .backends.engine import get_default_engine
engine = get_default_engine(*[call[1] for call in _calls])
ui = ui or Engine._create_ui(async_=async_, n_parallel=n_parallel)
batch_kw = dict(ui=ui, async_=True, n_parallel=n_parallel, timeout=timeout,
close_and_notify=close_and_notify)
fs = engine.batch(*_calls, **batch_kw)
if not isinstance(fs, Iterable):
fs = [fs]
if len(fs) == 0:
return
self._running = True
def relay_future(src, index=None, dest=None):
if dest.done():
return
with self._lock:
try:
result = src.result()
wrapper = self._wrappers.get(index)
if wrapper is not None:
result = wrapper(result)
dest.set_result(result)
except:
e, tb = sys.exc_info()[1:]
if six.PY2:
dest.set_exception_info(e, tb)
else:
dest.set_exception(e)
finally:
del self._calls[index]
del self._futures[index]
if index in self._wrappers:
del self._wrappers[index]
for idx, uf, bf in izip(_indices, _futures, fs):
uf.set_running_or_notify_cancel()
bf.add_done_callback(functools.partial(relay_future, index=idx, dest=uf))
if bf.done():
relay_future(bf, idx, uf)
if not async_:
try:
futures.wait(_futures, timeout)
for uf in _futures:
uf.result()
ui.notify('DataFrame execution succeeded')
except:
ui.notify('DataFrame execution failed')
raise
finally:
self._running = False
else:
delay_future = futures.Future()
delay_future.set_running_or_notify_cancel()
result_store, exc_store = [None], [None]
def _check_results(src):
if delay_future.done():
return
try:
result_store[0] = src.result()
except:
exc_store[0] = sys.exc_info()[1:]
with self._lock:
if all(f.done() for f in fs):
self._running = False
if exc_store[0] is not None:
e, tb = exc_store[0]
if six.PY2:
delay_future.set_exception_info(e, tb)
else:
delay_future.set_exception(e)
else:
delay_future.set_result(result_store[0])
for f in fs:
f.add_done_callback(_check_results)
if f.done():
_check_results(f)
return delay_future