public void run()

in frontend/server/src/main/java/com/amazonaws/ml/mms/metrics/MetricCollector.java [43:133]


    public void run() {
        try {
            // Collect System level Metrics
            String[] args = new String[2];
            args[0] = configManager.getPythonExecutable();
            args[1] = "mms/metrics/metric_collector.py";
            File workingDir = new File(configManager.getModelServerHome());

            String pythonPath = System.getenv("PYTHONPATH");
            String pythonEnv;
            if ((pythonPath == null || pythonPath.isEmpty())
                    && (!workingDir.getAbsolutePath().contains("site-package"))) {
                pythonEnv = "PYTHONPATH=" + workingDir.getAbsolutePath();
            } else {
                pythonEnv = "PYTHONPATH=" + pythonPath;
                if (!workingDir.getAbsolutePath().contains("site-package")) {
                    pythonEnv += File.pathSeparatorChar + workingDir.getAbsolutePath(); // NOPMD
                }
            }
            // sbin added for macs for python sysctl pythonpath
            StringBuilder path = new StringBuilder();
            path.append("PATH=").append(System.getenv("PATH"));
            String osName = System.getProperty("os.name");
            if (osName.startsWith("Mac OS X")) {
                path.append(File.pathSeparatorChar).append("/sbin/");
            }
            String[] env = {pythonEnv, path.toString()};
            final Process p = Runtime.getRuntime().exec(args, env, workingDir);

            ModelManager modelManager = ModelManager.getInstance();
            Map<Integer, WorkerThread> workerMap = modelManager.getWorkers();
            try (OutputStream os = p.getOutputStream()) {
                writeWorkerPids(workerMap, os);
            }

            new Thread(
                            () -> {
                                try {
                                    String error =
                                            IOUtils.toString(
                                                    p.getErrorStream(), StandardCharsets.UTF_8);
                                    if (!error.isEmpty()) {
                                        logger.error(error);
                                    }
                                } catch (IOException e) {
                                    logger.error("", e);
                                }
                            })
                    .start();

            MetricManager metricManager = MetricManager.getInstance();
            try (BufferedReader reader =
                    new BufferedReader(
                            new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
                List<Metric> metricsSystem = new ArrayList<>();
                metricManager.setMetrics(metricsSystem);

                String line;
                while ((line = reader.readLine()) != null) {
                    if (line.isEmpty()) {
                        break;
                    }
                    Metric metric = Metric.parse(line);
                    if (metric == null) {
                        logger.warn("Parse metrics failed: " + line);
                    } else {
                        loggerMetrics.info("{}", metric);
                        metricsSystem.add(metric);
                    }
                }

                // Collect process level metrics
                while ((line = reader.readLine()) != null) {
                    String[] tokens = line.split(":");
                    if (tokens.length != 2) {
                        continue;
                    }
                    try {
                        Integer pid = Integer.valueOf(tokens[0]);
                        WorkerThread worker = workerMap.get(pid);
                        worker.setMemory(Long.parseLong(tokens[1]));
                    } catch (NumberFormatException e) {
                        logger.warn("Failed to parse memory utilization metrics: " + line);
                        continue;
                    }
                }
            }
        } catch (IOException e) {
            logger.error("", e);
        }
    }