public void run()

in streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java [115:156]


  public void run() {
    LOGGER.info("BroadcastMonitorThread running");
    Preconditions.checkNotNull(configuration);
    Preconditions.checkNotNull(configuration.getMonitoringBroadcastIntervalMs());
    while (keepRunning) {
      try {
        List<String> messages = new ArrayList<>();
        Set<ObjectName> beans = server.queryNames(null, null);

        for (ObjectName name : beans) {
          String item = objectMapper.writeValueAsString(name);
          Broadcast broadcast = null;

          if (name.getKeyPropertyList().get("type") != null) {
            if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
              broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
            } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
              broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
            } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
              broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
            } else if (name.getKeyPropertyList().get("type").equals("Memory")) {
              broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
            }

            if (broadcast != null) {
              messages.add(objectMapper.writeValueAsString(broadcast));
            }
          }
        }

        messagePersister.persistMessages(messages);
        Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
      } catch (InterruptedException ex) {
        LOGGER.debug("Broadcast Monitor Interrupted!");
        Thread.currentThread().interrupt();
        this.keepRunning = false;
      } catch (Exception ex) {
        LOGGER.error("Exception: {}", ex);
        this.keepRunning = false;
      }
    }
  }