in lisa/runner.py [0:0]
def _start_loop(self) -> None:
# in case all of runners are disabled
runner_iterator = self._fetch_runners()
remaining_runners: List[BaseRunner] = []
run_message = messages.TestRunMessage(
status=messages.TestRunStatus.RUNNING,
)
notifier.notify(run_message)
task_manager = TaskManager[None](self._max_concurrency)
# set the global task manager for cancellation check
set_global_task_manager(task_manager)
has_more_runner = True
# run until no idle workers are available and all runner are closed
while task_manager.wait_worker() or has_more_runner or remaining_runners:
assert task_manager.has_idle_worker()
# submit tasks until idle workers are available
while task_manager.has_idle_worker():
for runner in remaining_runners[:]:
has_task = self._submit_runner_tasks(runner, task_manager)
if runner.is_done:
runner.close()
remaining_runners.remove(runner)
self._runners.remove(runner)
if has_task:
# This makes the loop is deep first. It intends to
# complete the prior runners firstly, instead of start
# later runners.
continue
# remove completed runners
self._log.debug(
f"running count: {task_manager.running_count}, "
f"id: {[x.id for x in remaining_runners]} "
)
if task_manager.has_idle_worker():
if has_more_runner:
# add new runner upto max concurrency if idle workers
# are available
try:
while len(remaining_runners) < self._max_concurrency:
runner = next(runner_iterator)
remaining_runners.append(runner)
self._log.debug(f"Added runner {runner.id}")
except StopIteration:
has_more_runner = False
else:
# reduce CPU utilization from infinite loop when idle
# workers are present but no task to run.
self._log.debug("Idle worker available but no new runner...")
break