in collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java [81:166]
public void run() {
if (properties != null) {
final String karafName = System.getProperty("karaf.name");
final String topic = this.topic;
String hostAddress = null;
String hostName = null;
try {
hostAddress = InetAddress.getLocalHost().getHostAddress();
hostName = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
// nothing to do
}
Collection<Callable<Object>> callables = new ArrayList<>();
Enumeration<String> keys = properties.keys();
while (keys.hasMoreElements()) {
String key = keys.nextElement();
if (key.startsWith("command.")) {
String finalHostAddress = hostAddress;
String finalHostName = hostName;
callables.add(() -> {
Event event = null;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
String command = (String) properties.get(key);
LOGGER.debug("Executing {} ({})", command, key);
CommandLine cmdLine = CommandLine.parse(command);
DefaultExecutor executor = new DefaultExecutor();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
executor.setStreamHandler(streamHandler);
HashMap<String, Object> data = new HashMap<>();
data.put("timestamp", System.currentTimeMillis());
data.put("type", "system");
data.put("karafName", karafName);
data.put("hostAddress", finalHostAddress);
data.put("hostName", finalHostName);
executor.execute(cmdLine);
outputStream.flush();
String output = outputStream.toString();
if (output.endsWith("\n")) {
output = output.substring(0, output.length() - 1);
}
output = output.trim();
// try to convert to number
try {
if (output.contains(".")) {
Double value = Double.parseDouble(output);
data.put(key, value);
} else {
Integer value = Integer.parseInt(output);
data.put(key, value);
}
} catch (NumberFormatException e) {
data.put(key, output);
}
streamHandler.stop();
event = new Event(topic + key.replace(".", "_"), data);
} catch (Exception e) {
LOGGER.warn("Command {} execution failed", key, e);
}
return event;
});
}
}
ExecutorService executorService = Executors.newFixedThreadPool(this.threadNumber);
try {
LOGGER.debug("Start invoking system commands...");
List<Future<Object>> results = executorService.invokeAll(callables);
results.stream().forEach(objectFuture -> {
try {
Event event = Event.class.cast(objectFuture.get());
if (Optional.ofNullable(event).isPresent()) {
dispatcher.postEvent(event);
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.warn("Thread executor for the collector system failed", e);
}
});
} catch (InterruptedException e) {
LOGGER.warn("Thread executor for the collector system failed", e);
}
executorService.shutdown();
LOGGER.debug("Invoking system commands done");
}
}