def run()

in aws_advanced_python_wrapper/host_monitoring_plugin.py [0:0]


    def run(self):
        try:
            self._is_stopped.clear()

            while not self.is_stopped:
                try:
                    current_time_ns = perf_counter_ns()
                    first_added_new_context = None

                    # Process new contexts
                    while (new_monitor_context := QueueUtils.get(self._new_contexts)) is not None:
                        if first_added_new_context == new_monitor_context:
                            # This context has already been processed.
                            # Add it back to the queue and process it in the next round.
                            self._new_contexts.put(new_monitor_context)
                            break

                        if not new_monitor_context.is_active:
                            # Discard inactive contexts
                            continue

                        if current_time_ns >= new_monitor_context.active_monitoring_start_time_ns:
                            # Submit the context for active monitoring
                            self._active_contexts.put(new_monitor_context)
                            continue

                        # The active monitoring start time has not been hit yet.
                        # Add the context back to the queue and check it later.
                        self._new_contexts.put(new_monitor_context)
                        if first_added_new_context is None:
                            first_added_new_context = new_monitor_context

                    if self._active_contexts.empty():
                        if (perf_counter_ns() - self._context_last_used_ns) >= self._monitor_disposal_time_ms * 1_000_000:
                            self._monitor_container.release_monitor(self)
                            break

                        self.sleep(Monitor._INACTIVE_SLEEP_MS / 1000)
                        continue

                    status_check_start_time_ns = perf_counter_ns()
                    self._context_last_used_ns = status_check_start_time_ns
                    status = self._check_host_status(self._host_check_timeout_ms)
                    delay_ms = -1
                    first_added_new_context = None

                    monitor_context: MonitoringContext
                    while (monitor_context := QueueUtils.get(self._active_contexts)) is not None:
                        with self._lock:
                            if not monitor_context.is_active:
                                # Discard inactive contexts
                                continue

                            if first_added_new_context == monitor_context:
                                # This context has already been processed by this loop.
                                # Add it back to the queue and exit the loop.
                                self._active_contexts.put(monitor_context)
                                break

                            # Process the context
                            monitor_context.update_host_status(
                                self._host_info.url,
                                status_check_start_time_ns,
                                status_check_start_time_ns + status.elapsed_time_ns,
                                status.is_available)

                            if not monitor_context.is_active or monitor_context.is_host_unavailable():
                                continue

                            # The context is still active and the host is still available. Continue monitoring the context.
                            self._active_contexts.put(monitor_context)
                            if first_added_new_context is None:
                                first_added_new_context = monitor_context

                            if delay_ms == -1 or delay_ms > monitor_context.failure_detection_interval_ms:
                                delay_ms = monitor_context.failure_detection_interval_ms

                    if delay_ms == -1:
                        delay_ms = Monitor._INACTIVE_SLEEP_MS
                    else:
                        # Subtract the time taken for the status check from the delay
                        delay_ms -= (status.elapsed_time_ns / 1_000_000)
                        if delay_ms <= 0:
                            delay_ms = Monitor._MIN_HOST_CHECK_TIMEOUT_MS
                        # Use this delay for all active contexts
                        self._host_check_timeout_ms = delay_ms

                    self.sleep(delay_ms / 1000)
                except InterruptedError as e:
                    raise e
                except Exception as e:
                    logger.debug("Monitor.ExceptionInMonitorLoop", self._host_info.host)
                    logger.debug(e, exc_info=True)
        except InterruptedError:
            logger.warning("Monitor.InterruptedException", self._host_info.host)
        except Exception as e:
            logger.debug("Monitor.StoppingMonitorUnhandledException", self._host_info.host)
            logger.debug(e, exc_info=True)
        finally:
            self._monitor_container.release_monitor(self)
            self.stop()
            if self._monitoring_conn is not None:
                try:
                    self._monitoring_conn.close()
                except Exception:
                    # Do nothing
                    pass
            self.stop()