in frontend/server/src/main/java/org/pytorch/serve/wlm/WorkLoadManager.java [88:158]
public CompletableFuture<Integer> modelChanged(
Model model, boolean isStartup, boolean isCleanUp) {
synchronized (model.getModelVersionName()) {
boolean isSnapshotSaved = false;
CompletableFuture<Integer> future = new CompletableFuture<>();
int minWorker = model.getMinWorkers();
int maxWorker = model.getMaxWorkers();
List<WorkerThread> threads;
if (minWorker == 0) {
threads = workers.remove(model.getModelVersionName());
if (threads == null) {
future.complete(HttpURLConnection.HTTP_OK);
if (!isStartup && !isCleanUp && !model.isWorkflowModel()) {
SnapshotManager.getInstance().saveSnapshot();
}
return future;
}
} else {
threads =
workers.computeIfAbsent(
model.getModelVersionName(), k -> new ArrayList<>());
}
int currentWorkers = threads.size();
if (currentWorkers < minWorker) {
addThreads(threads, model, minWorker - currentWorkers, future);
} else {
for (int i = currentWorkers - 1; i >= maxWorker; --i) {
WorkerThread thread = threads.remove(i);
WorkerLifeCycle lifecycle = thread.getLifeCycle();
thread.shutdown();
Process workerProcess = lifecycle.getProcess();
// Need to check worker process here since thread.shutdown() -> lifecycle.exit()
// -> This may nullify process object per destroyForcibly doc.
if (workerProcess != null && workerProcess.isAlive()) {
boolean workerDestroyed = false;
try {
String cmd = String.format(OSUtils.getKillCmd(), workerProcess.pid());
Process workerKillProcess = Runtime.getRuntime().exec(cmd, null, null);
workerDestroyed =
workerKillProcess.waitFor(
configManager.getUnregisterModelTimeout(),
TimeUnit.SECONDS);
} catch (InterruptedException | IOException e) {
logger.warn(
"WorkerThread interrupted during waitFor, possible async resource cleanup.");
future.complete(HttpURLConnection.HTTP_INTERNAL_ERROR);
return future;
}
if (!workerDestroyed) {
logger.warn(
"WorkerThread timed out while cleaning, please resend request.");
future.complete(HttpURLConnection.HTTP_CLIENT_TIMEOUT);
return future;
}
}
}
if (!isStartup && !isCleanUp && !model.isWorkflowModel()) {
SnapshotManager.getInstance().saveSnapshot();
isSnapshotSaved = true;
}
future.complete(HttpURLConnection.HTTP_OK);
}
if (!isStartup && !isSnapshotSaved && !isCleanUp && !model.isWorkflowModel()) {
SnapshotManager.getInstance().saveSnapshot();
}
return future;
}
}