in airflow-core/src/airflow/jobs/triggerer_job_runner.py [0:0]
def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger) -> None: # type: ignore[override]
from airflow.sdk.api.datamodels._generated import (
ConnectionResponse,
TaskStatesResponse,
VariableResponse,
XComResponse,
)
resp: BaseModel | None = None
dump_opts = {}
if isinstance(msg, messages.TriggerStateChanges):
if msg.events:
self.events.extend(msg.events)
if msg.failures:
self.failed_triggers.extend(msg.failures)
for id in msg.finished or ():
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
# Remove logger from the cache, and since structlog doesn't have an explicit close method, we
# only need to remove the last reference to it to close the open FH
if factory := self.logger_cache.pop(id, None):
factory.upload_to_remote()
response = messages.TriggerStateSync(
to_create=[],
to_cancel=self.cancelling_triggers,
)
# Pull out of these deques in a thread-safe manner
while self.creating_triggers:
workload = self.creating_triggers.popleft()
response.to_create.append(workload)
self.running_triggers.update(m.id for m in response.to_create)
resp = response
elif isinstance(msg, GetConnection):
conn = self.client.connections.get(msg.conn_id)
if isinstance(conn, ConnectionResponse):
conn_result = ConnectionResult.from_conn_response(conn)
resp = conn_result
dump_opts = {"exclude_unset": True}
else:
resp = conn
elif isinstance(msg, GetVariable):
var = self.client.variables.get(msg.key)
if isinstance(var, VariableResponse):
var_result = VariableResult.from_variable_response(var)
resp = var_result
dump_opts = {"exclude_unset": True}
else:
resp = var
elif isinstance(msg, GetXCom):
xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index)
if isinstance(xcom, XComResponse):
xcom_result = XComResult.from_xcom_response(xcom)
resp = xcom_result
dump_opts = {"exclude_unset": True}
else:
resp = xcom
elif isinstance(msg, GetDRCount):
dr_count = self.client.dag_runs.get_count(
dag_id=msg.dag_id,
logical_dates=msg.logical_dates,
run_ids=msg.run_ids,
states=msg.states,
)
resp = dr_count
elif isinstance(msg, GetDagRunState):
dr_resp = self.client.dag_runs.get_state(msg.dag_id, msg.run_id)
resp = DagRunStateResult.from_api_response(dr_resp)
elif isinstance(msg, GetTICount):
resp = self.client.task_instances.get_count(
dag_id=msg.dag_id,
map_index=msg.map_index,
task_ids=msg.task_ids,
task_group_id=msg.task_group_id,
logical_dates=msg.logical_dates,
run_ids=msg.run_ids,
states=msg.states,
)
elif isinstance(msg, GetTaskStates):
run_id_task_state_map = self.client.task_instances.get_task_states(
dag_id=msg.dag_id,
map_index=msg.map_index,
task_ids=msg.task_ids,
task_group_id=msg.task_group_id,
logical_dates=msg.logical_dates,
run_ids=msg.run_ids,
)
if isinstance(run_id_task_state_map, TaskStatesResponse):
resp = TaskStatesResult.from_api_response(run_id_task_state_map)
else:
resp = run_id_task_state_map
else:
raise ValueError(f"Unknown message type {type(msg)}")
if resp:
self.send_msg(resp, **dump_opts)