in scripts/doctor.py [0:0]
def run(args, **kwargs):
def execute_task(task, event_queue):
try:
event = task.function(args, **task.args)
event_queue.put(
TaskEvent(
name=task.name, success=event.success, message=event.message))
except:
event_queue.put(
TaskEvent(
name=task.name,
success=False,
message='Unexpected exception: {}'.format(
traceback.format_exception(*sys.exc_info()))))
tasks = {}
for task_name, v in kwargs.items():
if isinstance(v, tuple):
# Flexibly allowing variable length tuple.
# Tuple is the essentially Task without the name.
v = tuple(list(v) + [None] * (len(Task._fields) - len(v) - 1))
fn, prerequisites, extra_args = v
else:
fn = v
prerequisites = None
extra_args = None
tasks[task_name] = Task(
name=task_name,
function=fn,
prerequisites=set(prerequisites or []),
args=extra_args or {})
# Dry-run to ensure valid DAG.
do_run(
tasks, lambda task, event_queue: event_queue.put(
TaskEvent(name=task.name, success=True)))
with ThreadPool(5) as pool:
return do_run(
tasks, lambda task, event_queue: pool.apply_async(
execute_task, args=(task, event_queue)))