in images/airflow/2.10.1/python/mwaa/celery/task_monitor.py [0:0]
def _get_next_unprocessed_signal(self) -> (str, SignalData):
signal_search_start_timestamp = math.ceil((datetime.now(tz=tz.tzutc()) - SIGNAL_SEARCH_TIME_RANGE).timestamp())
signal_filenames = os.listdir(MWAA_SIGNALS_DIRECTORY) if os.path.exists(MWAA_SIGNALS_DIRECTORY) else []
sorted_filenames = sorted(signal_filenames)
for signal_filename in sorted_filenames:
# In case of issues, signals can arrive late and out of order. So, we scan all unprocessed signals in a search time range.
# Only a handful of signals are expected for each worker, so this repeated processing should be very light.
signal_file_path = os.path.join(MWAA_SIGNALS_DIRECTORY, signal_filename)
file_timestamp = os.path.getctime(signal_file_path)
if file_timestamp > signal_search_start_timestamp:
try:
with open(signal_file_path, "r") as file_data:
signal_data = SignalData.from_json_string(file_data.read())
if signal_data and not signal_data.processed:
self.stats.incr(f"mwaa.task_monitor.signal_read_error", 0)
return signal_file_path, signal_data
except Exception as e:
logger.error(f"File {signal_file_path} could not be read, signal will be ignored: {e}")
self.stats.incr(f"mwaa.task_monitor.signal_read_error", 1)
return None, None