def _process_signal()

in images/airflow/2.9.2/python/mwaa/celery/task_monitor.py [0:0]


    def _process_signal(self, signal_type, signal_timestamp):
        """
        This method is used to process a signal. It will update the state of work consumption accordingly.
        :param signal_type: Type of the signal received.
        :param signal_timestamp: Timestamp at which the signal was generated.
        :return:
        """
        signal_ignored = 0
        match signal_type:
            case SignalType.ACTIVATION:
                # Skipping checking if activation is already present because activation signals are by
                # design sent in redundant fashion to improve success of a newly created worker from getting
                # activated by the worker enabler lambda.
                self.waiting_for_activation = False
            case SignalType.KILL:
                if self.marked_for_kill:
                    logger.warning("Received kill signal but already marked for kill. Ignoring.")
                    signal_ignored = 1
                self.marked_for_kill = True
            case SignalType.TERMINATION:
                if (self.last_termination_or_resume_signal_timestamp is None or
                        self.last_termination_or_resume_signal_timestamp < signal_timestamp):
                    self.marked_for_termination = True
                    self.last_termination_or_resume_signal_timestamp = signal_timestamp
                    self.last_termination_processing_time = datetime.now(tz=tz.tzutc())
                else:
                    logger.warning("Received termination signal but older than the last termination/resume signal. Ignoring.")
                    signal_ignored = 1
            case SignalType.RESUME:
                if (self.last_termination_or_resume_signal_timestamp is None or
                        self.last_termination_or_resume_signal_timestamp < signal_timestamp):
                    self.marked_for_termination = False
                    self.last_termination_or_resume_signal_timestamp = signal_timestamp
                    self.last_termination_processing_time = None
                else:
                    logger.warning("Received resume signal but older than the last termination/resume signal. Ignoring.")
                    signal_ignored = 1
            case _:
                logger.warning(f"Unknown signal type {signal_type}, ignoring.")
                signal_ignored = 1
        self.stats.incr(f"mwaa.task_monitor.signal_ignored", signal_ignored)
        should_consume_work = not (self.waiting_for_activation or self.marked_for_kill or self.marked_for_termination)
        self.resume_task_consumption() if should_consume_work else self.pause_task_consumption()