private void start()

in brooklyn-library/qa/src/main/java/org/apache/brooklyn/qa/longevity/Monitor.java [139:252]


    private void start() throws IOException {
        LOG.info("Monitoring: "+prefs);
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        final AtomicReference<List<String>> previousLogLines = new AtomicReference<List<String>>(Collections.<String>emptyList());
        final TimeWindowedList<Integer> numWebProcessesHistory = new TimeWindowedList<Integer>(
                ImmutableMap.of("timePeriod", Duration.seconds(prefs.webProcessesCyclingPeriod), "minExpiredVals", 1));
        final Set<String> logGrepExclusions = ImmutableSet.copyOf(Files.readLines(prefs.logGrepExclusionsFile, Charsets.UTF_8));
        
        executor.scheduleAtFixedRate(new Runnable() {
                @Override public void run() {
                    StatusRecorder.Record record = new StatusRecorder.Record();
                    StringBuilder failureMsg = new StringBuilder();
                    try {
                        if (prefs.brooklynPid > 0) {
                            boolean pidRunning = MonitorUtils.isPidRunning(prefs.brooklynPid, "java");
                            MonitorUtils.MemoryUsage memoryUsage = MonitorUtils.getMemoryUsage(prefs.brooklynPid, ".*brooklyn.*", 1000);
                            record.put("pidRunning", pidRunning);
                            record.put("totalMemoryBytes", memoryUsage.getTotalMemoryBytes());
                            record.put("totalMemoryInstances", memoryUsage.getTotalInstances());
                            record.put("instanceCounts", memoryUsage.getInstanceCounts());
                            
                            if (!pidRunning) {
                                failureMsg.append("pid "+prefs.brooklynPid+" is not running"+"\n");
                            }
                        }
                        if (prefs.webUrl != null) {
                            boolean webUrlUp = MonitorUtils.isUrlUp(prefs.webUrl);
                            record.put("webUrlUp", webUrlUp);
                            
                            if (!webUrlUp) {
                                failureMsg.append("web URL "+prefs.webUrl+" is not available"+"\n");
                            }
                        }
                        if (prefs.logFile != null) {
                            List<String> logLines = MonitorUtils.searchLog(prefs.logFile, prefs.logGrep, logGrepExclusions);
                            List<String> newLogLines = getAdditions(previousLogLines.get(), logLines);
                            previousLogLines.set(logLines);
                            record.put("logLines", newLogLines);
                            
                            if (newLogLines.size() > 0) {
                                failureMsg.append("Log contains warnings/errors: "+newLogLines+"\n");
                            }
                        }
                        if (prefs.webProcessesRegex != null) {
                            List<Integer> pids = MonitorUtils.getRunningPids(prefs.webProcessesRegex, "--webProcesses");
                            pids.remove((Object)MonitorUtils.findOwnPid());
                            
                            record.put("webPids", pids);
                            record.put("numWebPids", pids.size());
                            numWebProcessesHistory.add(pids.size());
                            
                            if (prefs.numWebProcesses != null) {
                                boolean numWebPidsInRange = prefs.numWebProcesses.apply(pids.size());
                                record.put("numWebPidsInRange", numWebPidsInRange);
                                
                                if (!numWebPidsInRange) {
                                    failureMsg.append("num web processes out-of-range: pids="+pids+"; size="+pids.size()+"; expected="+prefs.numWebProcesses);
                                }
                            
                                if (prefs.webProcessesCyclingPeriod > 0) {
                                    List<TimestampedValue<Integer>> values = numWebProcessesHistory.getValues();
                                    long valuesTimeRange = (values.get(values.size()-1).getTimestamp() - values.get(0).getTimestamp());
                                    if (values.size() > 0 && valuesTimeRange > SECONDS.toMillis(prefs.webProcessesCyclingPeriod)) {
                                        int min = -1;
                                        int max = -1;
                                        for (TimestampedValue<Integer> val : values) {
                                            min = (min < 0) ? val.getValue() : Math.min(val.getValue(), min);
                                            max = Math.max(val.getValue(), max);
                                        }
                                        record.put("minWebSizeInPeriod", min);
                                        record.put("maxWebSizeInPeriod", max);
                                        
                                        if (min > prefs.numWebProcesses.lowerEndpoint() || max < prefs.numWebProcesses.upperEndpoint()) {
                                            failureMsg.append("num web processes not increasing/decreasing correctly: " +
                                                    "pids="+pids+"; size="+pids.size()+"; cyclePeriod="+prefs.webProcessesCyclingPeriod+
                                                    "; expectedRange="+prefs.numWebProcesses+"; min="+min+"; max="+max+"; history="+values);
                                        }
                                    } else {
                                        int numVals = values.size();
                                        long startTime = (numVals > 0) ? values.get(0).getTimestamp() : 0;
                                        long endTime = (numVals > 0) ? values.get(values.size()-1).getTimestamp() : 0;
                                        LOG.info("Insufficient vals in time-window to determine cycling behaviour over period ("+prefs.webProcessesCyclingPeriod+"secs): "+
                                                "numVals="+numVals+"; startTime="+startTime+"; endTime="+endTime+"; periodCovered="+(endTime-startTime)/1000);
                                    }
                                }
                            }
                        }
                        
                    } catch (Throwable t) {
                        LOG.error("Error during periodic checks", t);
                        throw Throwables.propagate(t);
                    }
                    
                    try {
                        recorder.record(record);
                        listener.onRecord(record);
                        
                        if (failureMsg.length() > 0) {
                            listener.onFailure(record, failureMsg.toString());
                            
                            if (prefs.abortOnError) {
                                LOG.error("Aborting on error: "+failureMsg);
                                System.exit(1);
                            }
                        }
                        
                    } catch (Throwable t) {
                        LOG.warn("Error recording monitor info ("+record+")", t);
                        throw Throwables.propagate(t);
                    }
                }
            }, 0, checkPeriodMs, TimeUnit.MILLISECONDS);
    }