in airflow-core/src/airflow/models/taskinstance.py [0:0]
def filter_for_tis(tis: Iterable[TaskInstance | TaskInstanceKey]) -> BooleanClauseList | None:
"""Return SQLAlchemy filter to query selected task instances."""
# DictKeys type, (what we often pass here from the scheduler) is not directly indexable :(
# Or it might be a generator, but we need to be able to iterate over it more than once
tis = list(tis)
if not tis:
return None
first = tis[0]
dag_id = first.dag_id
run_id = first.run_id
map_index = first.map_index
first_task_id = first.task_id
# pre-compute the set of dag_id, run_id, map_indices and task_ids
dag_ids, run_ids, map_indices, task_ids = set(), set(), set(), set()
for t in tis:
dag_ids.add(t.dag_id)
run_ids.add(t.run_id)
map_indices.add(t.map_index)
task_ids.add(t.task_id)
# Common path optimisations: when all TIs are for the same dag_id and run_id, or same dag_id
# and task_id -- this can be over 150x faster for huge numbers of TIs (20k+)
if dag_ids == {dag_id} and run_ids == {run_id} and map_indices == {map_index}:
return and_(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == run_id,
TaskInstance.map_index == map_index,
TaskInstance.task_id.in_(task_ids),
)
if dag_ids == {dag_id} and task_ids == {first_task_id} and map_indices == {map_index}:
return and_(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id.in_(run_ids),
TaskInstance.map_index == map_index,
TaskInstance.task_id == first_task_id,
)
if dag_ids == {dag_id} and run_ids == {run_id} and task_ids == {first_task_id}:
return and_(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == run_id,
TaskInstance.map_index.in_(map_indices),
TaskInstance.task_id == first_task_id,
)
filter_condition = []
# create 2 nested groups, both primarily grouped by dag_id and run_id,
# and in the nested group 1 grouped by task_id the other by map_index.
task_id_groups: dict[tuple, dict[Any, list[Any]]] = defaultdict(lambda: defaultdict(list))
map_index_groups: dict[tuple, dict[Any, list[Any]]] = defaultdict(lambda: defaultdict(list))
for t in tis:
task_id_groups[(t.dag_id, t.run_id)][t.task_id].append(t.map_index)
map_index_groups[(t.dag_id, t.run_id)][t.map_index].append(t.task_id)
# this assumes that most dags have dag_id as the largest grouping, followed by run_id. even
# if its not, this is still a significant optimization over querying for every single tuple key
for cur_dag_id, cur_run_id in itertools.product(dag_ids, run_ids):
# we compare the group size between task_id and map_index and use the smaller group
dag_task_id_groups = task_id_groups[(cur_dag_id, cur_run_id)]
dag_map_index_groups = map_index_groups[(cur_dag_id, cur_run_id)]
if len(dag_task_id_groups) <= len(dag_map_index_groups):
for cur_task_id, cur_map_indices in dag_task_id_groups.items():
filter_condition.append(
and_(
TaskInstance.dag_id == cur_dag_id,
TaskInstance.run_id == cur_run_id,
TaskInstance.task_id == cur_task_id,
TaskInstance.map_index.in_(cur_map_indices),
)
)
else:
for cur_map_index, cur_task_ids in dag_map_index_groups.items():
filter_condition.append(
and_(
TaskInstance.dag_id == cur_dag_id,
TaskInstance.run_id == cur_run_id,
TaskInstance.task_id.in_(cur_task_ids),
TaskInstance.map_index == cur_map_index,
)
)
return or_(*filter_condition)