in streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java [115:156]
public void run() {
LOGGER.info("BroadcastMonitorThread running");
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(configuration.getMonitoringBroadcastIntervalMs());
while (keepRunning) {
try {
List<String> messages = new ArrayList<>();
Set<ObjectName> beans = server.queryNames(null, null);
for (ObjectName name : beans) {
String item = objectMapper.writeValueAsString(name);
Broadcast broadcast = null;
if (name.getKeyPropertyList().get("type") != null) {
if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
} else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
} else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
} else if (name.getKeyPropertyList().get("type").equals("Memory")) {
broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
}
if (broadcast != null) {
messages.add(objectMapper.writeValueAsString(broadcast));
}
}
}
messagePersister.persistMessages(messages);
Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
} catch (InterruptedException ex) {
LOGGER.debug("Broadcast Monitor Interrupted!");
Thread.currentThread().interrupt();
this.keepRunning = false;
} catch (Exception ex) {
LOGGER.error("Exception: {}", ex);
this.keepRunning = false;
}
}
}