def _handle_request()

in airflow-core/src/airflow/jobs/triggerer_job_runner.py [0:0]


    def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger) -> None:  # type: ignore[override]
        from airflow.sdk.api.datamodels._generated import (
            ConnectionResponse,
            TaskStatesResponse,
            VariableResponse,
            XComResponse,
        )

        resp: BaseModel | None = None
        dump_opts = {}

        if isinstance(msg, messages.TriggerStateChanges):
            if msg.events:
                self.events.extend(msg.events)
            if msg.failures:
                self.failed_triggers.extend(msg.failures)
            for id in msg.finished or ():
                self.running_triggers.discard(id)
                self.cancelling_triggers.discard(id)
                # Remove logger from the cache, and since structlog doesn't have an explicit close method, we
                # only need to remove the last reference to it to close the open FH
                if factory := self.logger_cache.pop(id, None):
                    factory.upload_to_remote()

            response = messages.TriggerStateSync(
                to_create=[],
                to_cancel=self.cancelling_triggers,
            )

            # Pull out of these deques in a thread-safe manner
            while self.creating_triggers:
                workload = self.creating_triggers.popleft()
                response.to_create.append(workload)
            self.running_triggers.update(m.id for m in response.to_create)
            resp = response

        elif isinstance(msg, GetConnection):
            conn = self.client.connections.get(msg.conn_id)
            if isinstance(conn, ConnectionResponse):
                conn_result = ConnectionResult.from_conn_response(conn)
                resp = conn_result
                dump_opts = {"exclude_unset": True}
            else:
                resp = conn
        elif isinstance(msg, GetVariable):
            var = self.client.variables.get(msg.key)
            if isinstance(var, VariableResponse):
                var_result = VariableResult.from_variable_response(var)
                resp = var_result
                dump_opts = {"exclude_unset": True}
            else:
                resp = var
        elif isinstance(msg, GetXCom):
            xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index)
            if isinstance(xcom, XComResponse):
                xcom_result = XComResult.from_xcom_response(xcom)
                resp = xcom_result
                dump_opts = {"exclude_unset": True}
            else:
                resp = xcom
        elif isinstance(msg, GetDRCount):
            dr_count = self.client.dag_runs.get_count(
                dag_id=msg.dag_id,
                logical_dates=msg.logical_dates,
                run_ids=msg.run_ids,
                states=msg.states,
            )
            resp = dr_count
        elif isinstance(msg, GetDagRunState):
            dr_resp = self.client.dag_runs.get_state(msg.dag_id, msg.run_id)
            resp = DagRunStateResult.from_api_response(dr_resp)

        elif isinstance(msg, GetTICount):
            resp = self.client.task_instances.get_count(
                dag_id=msg.dag_id,
                map_index=msg.map_index,
                task_ids=msg.task_ids,
                task_group_id=msg.task_group_id,
                logical_dates=msg.logical_dates,
                run_ids=msg.run_ids,
                states=msg.states,
            )

        elif isinstance(msg, GetTaskStates):
            run_id_task_state_map = self.client.task_instances.get_task_states(
                dag_id=msg.dag_id,
                map_index=msg.map_index,
                task_ids=msg.task_ids,
                task_group_id=msg.task_group_id,
                logical_dates=msg.logical_dates,
                run_ids=msg.run_ids,
            )
            if isinstance(run_id_task_state_map, TaskStatesResponse):
                resp = TaskStatesResult.from_api_response(run_id_task_state_map)
            else:
                resp = run_id_task_state_map
        else:
            raise ValueError(f"Unknown message type {type(msg)}")

        if resp:
            self.send_msg(resp, **dump_opts)