in images/airflow/2.10.3/python/mwaa/celery/task_monitor.py [0:0]
def _process_signal(self, signal_type, signal_timestamp):
"""
This method is used to process a signal. It will update the state of work consumption accordingly.
:param signal_type: Type of the signal received.
:param signal_timestamp: Timestamp at which the signal was generated.
:return:
"""
signal_ignored = 0
match signal_type:
case SignalType.ACTIVATION:
# Skipping checking if activation is already present because activation signals are by
# design sent in redundant fashion to improve success of a newly created worker from getting
# activated by the worker enabler lambda.
self.waiting_for_activation = False
case SignalType.KILL:
if self.marked_for_kill:
logger.warning("Received kill signal but already marked for kill. Ignoring.")
signal_ignored = 1
self.marked_for_kill = True
case SignalType.TERMINATION:
if (self.last_termination_or_resume_signal_timestamp is None or
self.last_termination_or_resume_signal_timestamp < signal_timestamp):
self.marked_for_termination = True
self.last_termination_or_resume_signal_timestamp = signal_timestamp
self.last_termination_processing_time = datetime.now(tz=tz.tzutc())
else:
logger.warning("Received termination signal but older than the last termination/resume signal. Ignoring.")
signal_ignored = 1
case SignalType.RESUME:
if (self.last_termination_or_resume_signal_timestamp is None or
self.last_termination_or_resume_signal_timestamp < signal_timestamp):
self.marked_for_termination = False
self.last_termination_or_resume_signal_timestamp = signal_timestamp
self.last_termination_processing_time = None
else:
logger.warning("Received resume signal but older than the last termination/resume signal. Ignoring.")
signal_ignored = 1
case _:
logger.warning(f"Unknown signal type {signal_type}, ignoring.")
signal_ignored = 1
self.stats.incr(f"mwaa.task_monitor.signal_ignored", signal_ignored)
should_consume_work = not (self.waiting_for_activation or self.marked_for_kill or self.marked_for_termination)
self.resume_task_consumption() if should_consume_work else self.pause_task_consumption()