public CompletableFuture modelChanged()

in frontend/server/src/main/java/org/pytorch/serve/wlm/WorkLoadManager.java [88:158]


    public CompletableFuture<Integer> modelChanged(
            Model model, boolean isStartup, boolean isCleanUp) {
        synchronized (model.getModelVersionName()) {
            boolean isSnapshotSaved = false;
            CompletableFuture<Integer> future = new CompletableFuture<>();
            int minWorker = model.getMinWorkers();
            int maxWorker = model.getMaxWorkers();
            List<WorkerThread> threads;
            if (minWorker == 0) {
                threads = workers.remove(model.getModelVersionName());
                if (threads == null) {
                    future.complete(HttpURLConnection.HTTP_OK);
                    if (!isStartup && !isCleanUp && !model.isWorkflowModel()) {
                        SnapshotManager.getInstance().saveSnapshot();
                    }
                    return future;
                }
            } else {
                threads =
                        workers.computeIfAbsent(
                                model.getModelVersionName(), k -> new ArrayList<>());
            }

            int currentWorkers = threads.size();
            if (currentWorkers < minWorker) {
                addThreads(threads, model, minWorker - currentWorkers, future);
            } else {
                for (int i = currentWorkers - 1; i >= maxWorker; --i) {
                    WorkerThread thread = threads.remove(i);
                    WorkerLifeCycle lifecycle = thread.getLifeCycle();
                    thread.shutdown();

                    Process workerProcess = lifecycle.getProcess();

                    // Need to check worker process here since thread.shutdown() -> lifecycle.exit()
                    // -> This may nullify process object per destroyForcibly doc.
                    if (workerProcess != null && workerProcess.isAlive()) {
                        boolean workerDestroyed = false;
                        try {
                            String cmd = String.format(OSUtils.getKillCmd(), workerProcess.pid());
                            Process workerKillProcess = Runtime.getRuntime().exec(cmd, null, null);
                            workerDestroyed =
                                    workerKillProcess.waitFor(
                                            configManager.getUnregisterModelTimeout(),
                                            TimeUnit.SECONDS);
                        } catch (InterruptedException | IOException e) {
                            logger.warn(
                                    "WorkerThread interrupted during waitFor, possible async resource cleanup.");
                            future.complete(HttpURLConnection.HTTP_INTERNAL_ERROR);
                            return future;
                        }
                        if (!workerDestroyed) {
                            logger.warn(
                                    "WorkerThread timed out while cleaning, please resend request.");
                            future.complete(HttpURLConnection.HTTP_CLIENT_TIMEOUT);
                            return future;
                        }
                    }
                }
                if (!isStartup && !isCleanUp && !model.isWorkflowModel()) {
                    SnapshotManager.getInstance().saveSnapshot();
                    isSnapshotSaved = true;
                }
                future.complete(HttpURLConnection.HTTP_OK);
            }
            if (!isStartup && !isSnapshotSaved && !isCleanUp && !model.isWorkflowModel()) {
                SnapshotManager.getInstance().saveSnapshot();
            }
            return future;
        }
    }