in mozci/push.py [0:0]
def is_group_running(self, group):
"""Checks if the provided group is still running on this push.
Returns:
bool: True, if the group is still running.
False, if the group is completed.
"""
running_tasks = [
task for task in self.tasks if task.state not in TASK_FINAL_STATES
]
group_types = set()
for task in group.tasks:
suite = task.suite
assert (
suite is not None
), f"Couldn't parse suite for {task.label} ({task.id})"
group_types.add(suite)
if all(task.is_tests_grouped for task in group.tasks):
for task in running_tasks:
if task.suite not in group_types:
continue
task_def = get_task(task.id)
test_paths = json.loads(
task_def["payload"]
.get("env", {})
.get("MOZHARNESS_TEST_PATHS", "{}")
)
if group.name in {
name for names in test_paths.values() for name in names
}:
return True
return False
running_types = {task.suite for task in running_tasks}
return not group_types.isdisjoint(running_types)