in frontend/server/src/main/java/com/amazonaws/ml/mms/metrics/MetricCollector.java [43:133]
public void run() {
try {
// Collect System level Metrics
String[] args = new String[2];
args[0] = configManager.getPythonExecutable();
args[1] = "mms/metrics/metric_collector.py";
File workingDir = new File(configManager.getModelServerHome());
String pythonPath = System.getenv("PYTHONPATH");
String pythonEnv;
if ((pythonPath == null || pythonPath.isEmpty())
&& (!workingDir.getAbsolutePath().contains("site-package"))) {
pythonEnv = "PYTHONPATH=" + workingDir.getAbsolutePath();
} else {
pythonEnv = "PYTHONPATH=" + pythonPath;
if (!workingDir.getAbsolutePath().contains("site-package")) {
pythonEnv += File.pathSeparatorChar + workingDir.getAbsolutePath(); // NOPMD
}
}
// sbin added for macs for python sysctl pythonpath
StringBuilder path = new StringBuilder();
path.append("PATH=").append(System.getenv("PATH"));
String osName = System.getProperty("os.name");
if (osName.startsWith("Mac OS X")) {
path.append(File.pathSeparatorChar).append("/sbin/");
}
String[] env = {pythonEnv, path.toString()};
final Process p = Runtime.getRuntime().exec(args, env, workingDir);
ModelManager modelManager = ModelManager.getInstance();
Map<Integer, WorkerThread> workerMap = modelManager.getWorkers();
try (OutputStream os = p.getOutputStream()) {
writeWorkerPids(workerMap, os);
}
new Thread(
() -> {
try {
String error =
IOUtils.toString(
p.getErrorStream(), StandardCharsets.UTF_8);
if (!error.isEmpty()) {
logger.error(error);
}
} catch (IOException e) {
logger.error("", e);
}
})
.start();
MetricManager metricManager = MetricManager.getInstance();
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
List<Metric> metricsSystem = new ArrayList<>();
metricManager.setMetrics(metricsSystem);
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) {
break;
}
Metric metric = Metric.parse(line);
if (metric == null) {
logger.warn("Parse metrics failed: " + line);
} else {
loggerMetrics.info("{}", metric);
metricsSystem.add(metric);
}
}
// Collect process level metrics
while ((line = reader.readLine()) != null) {
String[] tokens = line.split(":");
if (tokens.length != 2) {
continue;
}
try {
Integer pid = Integer.valueOf(tokens[0]);
WorkerThread worker = workerMap.get(pid);
worker.setMemory(Long.parseLong(tokens[1]));
} catch (NumberFormatException e) {
logger.warn("Failed to parse memory utilization metrics: " + line);
continue;
}
}
}
} catch (IOException e) {
logger.error("", e);
}
}