in aws_advanced_python_wrapper/host_monitoring_plugin.py [0:0]
def run(self):
try:
self._is_stopped.clear()
while not self.is_stopped:
try:
current_time_ns = perf_counter_ns()
first_added_new_context = None
# Process new contexts
while (new_monitor_context := QueueUtils.get(self._new_contexts)) is not None:
if first_added_new_context == new_monitor_context:
# This context has already been processed.
# Add it back to the queue and process it in the next round.
self._new_contexts.put(new_monitor_context)
break
if not new_monitor_context.is_active:
# Discard inactive contexts
continue
if current_time_ns >= new_monitor_context.active_monitoring_start_time_ns:
# Submit the context for active monitoring
self._active_contexts.put(new_monitor_context)
continue
# The active monitoring start time has not been hit yet.
# Add the context back to the queue and check it later.
self._new_contexts.put(new_monitor_context)
if first_added_new_context is None:
first_added_new_context = new_monitor_context
if self._active_contexts.empty():
if (perf_counter_ns() - self._context_last_used_ns) >= self._monitor_disposal_time_ms * 1_000_000:
self._monitor_container.release_monitor(self)
break
self.sleep(Monitor._INACTIVE_SLEEP_MS / 1000)
continue
status_check_start_time_ns = perf_counter_ns()
self._context_last_used_ns = status_check_start_time_ns
status = self._check_host_status(self._host_check_timeout_ms)
delay_ms = -1
first_added_new_context = None
monitor_context: MonitoringContext
while (monitor_context := QueueUtils.get(self._active_contexts)) is not None:
with self._lock:
if not monitor_context.is_active:
# Discard inactive contexts
continue
if first_added_new_context == monitor_context:
# This context has already been processed by this loop.
# Add it back to the queue and exit the loop.
self._active_contexts.put(monitor_context)
break
# Process the context
monitor_context.update_host_status(
self._host_info.url,
status_check_start_time_ns,
status_check_start_time_ns + status.elapsed_time_ns,
status.is_available)
if not monitor_context.is_active or monitor_context.is_host_unavailable():
continue
# The context is still active and the host is still available. Continue monitoring the context.
self._active_contexts.put(monitor_context)
if first_added_new_context is None:
first_added_new_context = monitor_context
if delay_ms == -1 or delay_ms > monitor_context.failure_detection_interval_ms:
delay_ms = monitor_context.failure_detection_interval_ms
if delay_ms == -1:
delay_ms = Monitor._INACTIVE_SLEEP_MS
else:
# Subtract the time taken for the status check from the delay
delay_ms -= (status.elapsed_time_ns / 1_000_000)
if delay_ms <= 0:
delay_ms = Monitor._MIN_HOST_CHECK_TIMEOUT_MS
# Use this delay for all active contexts
self._host_check_timeout_ms = delay_ms
self.sleep(delay_ms / 1000)
except InterruptedError as e:
raise e
except Exception as e:
logger.debug("Monitor.ExceptionInMonitorLoop", self._host_info.host)
logger.debug(e, exc_info=True)
except InterruptedError:
logger.warning("Monitor.InterruptedException", self._host_info.host)
except Exception as e:
logger.debug("Monitor.StoppingMonitorUnhandledException", self._host_info.host)
logger.debug(e, exc_info=True)
finally:
self._monitor_container.release_monitor(self)
self.stop()
if self._monitoring_conn is not None:
try:
self._monitoring_conn.close()
except Exception:
# Do nothing
pass
self.stop()