in frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkerThread.java [202:266]
public void run() {
Process process = null;
Thread thread = Thread.currentThread();
thread.setName(getWorkerName());
currentThread.set(thread);
HttpResponseStatus status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
try {
if (!serverThread) {
connect();
runWorker();
} else {
// TODO: Move this logic to a seperate ServerThread class
// This is server thread and shouldn't come out as long as process exists in CPU.
model.setPort(port);
lifeCycle.startBackendServer(port);
setState(WorkerState.WORKER_MODEL_LOADED, HttpResponseStatus.OK);
process = lifeCycle.getProcess();
process.waitFor();
}
} catch (InterruptedException e) {
if (state == WorkerState.WORKER_SCALED_DOWN) {
logger.debug("Shutting down the thread .. Scaling down.");
} else {
logger.debug(
"Backend worker monitoring thread interrupted or backend worker process died.",
e);
}
} catch (WorkerInitializationException e) {
logger.error("Backend worker error", e);
} catch (OutOfMemoryError oom) {
logger.error("Out of memory error when creating workers", oom);
status = HttpResponseStatus.INSUFFICIENT_STORAGE;
} catch (Throwable t) {
logger.warn("Backend worker thread exception.", t);
} finally {
// WorkerThread is running in thread pool, the thread will be assigned to next
// Runnable once this worker is finished. If currentThread keep holding the reference
// of the thread, currentThread.interrupt() might kill next worker.
backendChannel.disconnect();
currentThread.set(null);
Integer exitValue = lifeCycle.getExitValue();
if (exitValue != null && exitValue == 137) {
status = HttpResponseStatus.INSUFFICIENT_STORAGE;
}
if (!serverThread && req != null) {
aggregator.sendError(req, "Worker died.", status);
} else if (serverThread) {
model.setPort(-1);
if (process != null && process.isAlive()) {
process.destroyForcibly();
try {
process.waitFor(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn(
"WorkerThread interrupted during waitFor, possible asynch resource cleanup.");
}
}
}
setState(WorkerState.WORKER_STOPPED, status);
lifeCycle.exit();
retry();
}
}