in scripts/redundant_failures.py [0:0]
def count(is_first_task, is_second_task):
assert db.download(test_scheduling.PUSH_DATA_LABEL_DB)
push_data = list(db.read(test_scheduling.PUSH_DATA_LABEL_DB))
logger.info("Analyzing %d pushes...", len(push_data))
all_tasks = set(task for _, _, push_tasks, _, _ in push_data for task in push_tasks)
logger.info("Considering %d tasks...", len(all_tasks))
count_runs = 0
count_any_of_the_two = 0
count_first_but_not_second = 0
count_second_but_not_first = 0
for push in push_data:
(
revisions,
fix_revision,
push_tasks,
possible_regressions,
likely_regressions,
) = push
first_group_tasks = [
task.split("/")[1] for task in push_tasks if is_first_task(task)
]
second_group_tasks = [
task.split("/")[1] for task in push_tasks if is_second_task(task)
]
if len(first_group_tasks) == 0 and len(second_group_tasks) == 0:
continue
in_both_tasks = set(first_group_tasks) & set(second_group_tasks)
# Only consider pushes where tasks run in both groups.
if len(in_both_tasks) == 0:
continue
count_runs += 1
failures = [
task
for task in likely_regressions + possible_regressions
if any(task.endswith(in_both_task) for in_both_task in in_both_tasks)
]
first_failures = [task for task in failures if is_first_task(task)]
second_failures = [task for task in failures if is_second_task(task)]
if len(first_failures) > 0 or len(second_failures) > 0:
count_any_of_the_two += 1
if len(first_failures) > 0 and len(second_failures) == 0:
count_first_but_not_second += 1
elif len(first_failures) == 0 and len(second_failures) > 0:
count_second_but_not_first += 1
return (
count_runs,
count_any_of_the_two,
count_first_but_not_second,
count_second_but_not_first,
)