in qa/src/main/java/org/apache/brooklyn/qa/longevity/Monitor.java [139:252]
private void start() throws IOException {
LOG.info("Monitoring: "+prefs);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
final AtomicReference<List<String>> previousLogLines = new AtomicReference<List<String>>(Collections.<String>emptyList());
final TimeWindowedList<Integer> numWebProcessesHistory = new TimeWindowedList<Integer>(
ImmutableMap.of("timePeriod", Duration.seconds(prefs.webProcessesCyclingPeriod), "minExpiredVals", 1));
final Set<String> logGrepExclusions = ImmutableSet.copyOf(Files.readLines(prefs.logGrepExclusionsFile, Charsets.UTF_8));
executor.scheduleAtFixedRate(new Runnable() {
@Override public void run() {
StatusRecorder.Record record = new StatusRecorder.Record();
StringBuilder failureMsg = new StringBuilder();
try {
if (prefs.brooklynPid > 0) {
boolean pidRunning = MonitorUtils.isPidRunning(prefs.brooklynPid, "java");
MonitorUtils.MemoryUsage memoryUsage = MonitorUtils.getMemoryUsage(prefs.brooklynPid, ".*brooklyn.*", 1000);
record.put("pidRunning", pidRunning);
record.put("totalMemoryBytes", memoryUsage.getTotalMemoryBytes());
record.put("totalMemoryInstances", memoryUsage.getTotalInstances());
record.put("instanceCounts", memoryUsage.getInstanceCounts());
if (!pidRunning) {
failureMsg.append("pid "+prefs.brooklynPid+" is not running"+"\n");
}
}
if (prefs.webUrl != null) {
boolean webUrlUp = MonitorUtils.isUrlUp(prefs.webUrl);
record.put("webUrlUp", webUrlUp);
if (!webUrlUp) {
failureMsg.append("web URL "+prefs.webUrl+" is not available"+"\n");
}
}
if (prefs.logFile != null) {
List<String> logLines = MonitorUtils.searchLog(prefs.logFile, prefs.logGrep, logGrepExclusions);
List<String> newLogLines = getAdditions(previousLogLines.get(), logLines);
previousLogLines.set(logLines);
record.put("logLines", newLogLines);
if (newLogLines.size() > 0) {
failureMsg.append("Log contains warnings/errors: "+newLogLines+"\n");
}
}
if (prefs.webProcessesRegex != null) {
List<Integer> pids = MonitorUtils.getRunningPids(prefs.webProcessesRegex, "--webProcesses");
pids.remove((Object)MonitorUtils.findOwnPid());
record.put("webPids", pids);
record.put("numWebPids", pids.size());
numWebProcessesHistory.add(pids.size());
if (prefs.numWebProcesses != null) {
boolean numWebPidsInRange = prefs.numWebProcesses.apply(pids.size());
record.put("numWebPidsInRange", numWebPidsInRange);
if (!numWebPidsInRange) {
failureMsg.append("num web processes out-of-range: pids="+pids+"; size="+pids.size()+"; expected="+prefs.numWebProcesses);
}
if (prefs.webProcessesCyclingPeriod > 0) {
List<TimestampedValue<Integer>> values = numWebProcessesHistory.getValues();
long valuesTimeRange = (values.get(values.size()-1).getTimestamp() - values.get(0).getTimestamp());
if (values.size() > 0 && valuesTimeRange > SECONDS.toMillis(prefs.webProcessesCyclingPeriod)) {
int min = -1;
int max = -1;
for (TimestampedValue<Integer> val : values) {
min = (min < 0) ? val.getValue() : Math.min(val.getValue(), min);
max = Math.max(val.getValue(), max);
}
record.put("minWebSizeInPeriod", min);
record.put("maxWebSizeInPeriod", max);
if (min > prefs.numWebProcesses.lowerEndpoint() || max < prefs.numWebProcesses.upperEndpoint()) {
failureMsg.append("num web processes not increasing/decreasing correctly: " +
"pids="+pids+"; size="+pids.size()+"; cyclePeriod="+prefs.webProcessesCyclingPeriod+
"; expectedRange="+prefs.numWebProcesses+"; min="+min+"; max="+max+"; history="+values);
}
} else {
int numVals = values.size();
long startTime = (numVals > 0) ? values.get(0).getTimestamp() : 0;
long endTime = (numVals > 0) ? values.get(values.size()-1).getTimestamp() : 0;
LOG.info("Insufficient vals in time-window to determine cycling behaviour over period ("+prefs.webProcessesCyclingPeriod+"secs): "+
"numVals="+numVals+"; startTime="+startTime+"; endTime="+endTime+"; periodCovered="+(endTime-startTime)/1000);
}
}
}
}
} catch (Throwable t) {
LOG.error("Error during periodic checks", t);
throw Throwables.propagate(t);
}
try {
recorder.record(record);
listener.onRecord(record);
if (failureMsg.length() > 0) {
listener.onFailure(record, failureMsg.toString());
if (prefs.abortOnError) {
LOG.error("Aborting on error: "+failureMsg);
System.exit(1);
}
}
} catch (Throwable t) {
LOG.warn("Error recording monitor info ("+record+")", t);
throw Throwables.propagate(t);
}
}
}, 0, checkPeriodMs, TimeUnit.MILLISECONDS);
}