def _get_next_unprocessed_signal()

in images/airflow/2.9.2/python/mwaa/celery/task_monitor.py [0:0]


    def _get_next_unprocessed_signal(self) -> (str, SignalData):
        signal_search_start_timestamp = math.ceil((datetime.now(tz=tz.tzutc()) - SIGNAL_SEARCH_TIME_RANGE).timestamp())
        signal_filenames = os.listdir(MWAA_SIGNALS_DIRECTORY) if os.path.exists(MWAA_SIGNALS_DIRECTORY) else []
        sorted_filenames = sorted(signal_filenames)
        for signal_filename in sorted_filenames:
            # In case of issues, signals can arrive late and out of order. So, we scan all unprocessed signals in a search time range.
            # Only a handful of signals are expected for each worker, so this repeated processing should be very light.
            signal_file_path = os.path.join(MWAA_SIGNALS_DIRECTORY, signal_filename)
            file_timestamp = os.path.getctime(signal_file_path)
            if file_timestamp > signal_search_start_timestamp:
                try:
                    with open(signal_file_path, "r") as file_data:
                        signal_data = SignalData.from_json_string(file_data.read())
                        if signal_data and not signal_data.processed:
                            self.stats.incr(f"mwaa.task_monitor.signal_read_error", 0)
                            return signal_file_path, signal_data
                except Exception as e:
                    logger.error(f"File {signal_file_path} could not be read, signal will be ignored: {e}")
                    self.stats.incr(f"mwaa.task_monitor.signal_read_error", 1)
        return None, None