in src/hpcadvisor/data_collector.py [0:0]
def process_tasks_multitask(tasks_file, dataset_file, collector_config):
log.debug("Starting task processing in multi task mode")
tasks = taskset_handler.get_tasks_from_file(tasks_file)
previous_sku = ""
jobname = ""
poolname = ""
taskselector_policy = collector_config.get("policy", None)
taskcounter = 0
log.debug("Starting task processing in multi task mode")
max_parallel_tasks = collector_config.get("policy", None).config.get(
"paralleltasks"
)
log.debug(f"Max parallel tasks: {max_parallel_tasks}")
num_running_tasks = 0
pooling_delay_completion = 10
running_tasks = []
completed_tasks = []
while True:
all_pending_tasks = taskset_handler.get_tasks_from_file(tasks_file)
num_new_tasks = max_parallel_tasks - num_running_tasks
log.debug(f"Asking task selector for {num_new_tasks} new tasks")
new_tasks = taskselector_policy.get_tasks(all_pending_tasks, num_new_tasks)
if len(new_tasks) == 0 and len(running_tasks) == 0:
log.info("No new tasks to be processed")
break
log.debug(f"Processing new {len(new_tasks)} tasks")
for task in new_tasks:
start_task_in_parallel(task, tasks_file, dataset_file, collector_config)
running_tasks.append(task)
num_running_tasks += 1
while True:
time.sleep(pooling_delay_completion)
for task in running_tasks[:]:
task_has_completed, task_status = check_task_completion(task)
if task_has_completed:
process_task_completion(task, task_status, tasks_file, dataset_file)
completed_tasks.append(task)
running_tasks.remove(task)
num_running_tasks -= 1
if completed_tasks:
completed_tasks = []
log.debug("Some tasks completed, moving to next batch")
break