in frontend/server/src/main/java/org/pytorch/serve/workflow/WorkflowManager.java [282:362]
public void unregisterModels(
String workflowName, Map<String, Node> nodes, ArrayList<String> successNodes)
throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletionService<ModelRegistrationResult> executorCompletionService =
new ExecutorCompletionService<>(executorService);
List<Future<ModelRegistrationResult>> futures = new ArrayList<>();
for (Map.Entry<String, Node> entry : nodes.entrySet()) {
Node node = entry.getValue();
WorkflowModel wfm = node.getWorkflowModel();
futures.add(
executorCompletionService.submit(
() -> {
StatusResponse status = new StatusResponse();
try {
ApiUtils.unregisterModel(wfm.getName(), null);
status.setHttpResponseCode(HttpURLConnection.HTTP_OK);
status.setStatus(
String.format(
"Unregisterd workflow node %s", wfm.getName()));
} catch (ModelNotFoundException | ModelVersionNotFoundException e) {
if (successNodes == null
|| successNodes.contains(wfm.getName())) {
status.setHttpResponseCode(
HttpURLConnection.HTTP_INTERNAL_ERROR);
status.setStatus(
String.format(
"Error while unregistering workflow node %s",
wfm.getName()));
status.setE(e);
logger.error(
"Model '"
+ wfm.getName()
+ "' failed to unregister.",
e);
} else {
status.setHttpResponseCode(HttpURLConnection.HTTP_OK);
status.setStatus(
String.format(
"Error while unregistering workflow node %s but can be ignored.",
wfm.getName()));
status.setE(e);
}
} catch (Exception e) {
status.setHttpResponseCode(
HttpURLConnection.HTTP_INTERNAL_ERROR);
status.setStatus(
String.format(
"Error while unregistering workflow node %s",
wfm.getName()));
status.setE(e);
}
return new ModelRegistrationResult(wfm.getName(), status);
}));
}
int i = 0;
boolean failed = false;
ArrayList<String> failedMessages = new ArrayList<>();
while (i < futures.size()) {
i++;
Future<ModelRegistrationResult> future = executorCompletionService.take();
ModelRegistrationResult result = future.get();
if (result.getResponse().getHttpResponseCode() != HttpURLConnection.HTTP_OK) {
failed = true;
failedMessages.add(result.getResponse().getStatus());
}
}
if (failed) {
throw new InternalServerException(
"Error while unregistering the workflow "
+ workflowName
+ ". Details: "
+ failedMessages.toArray().toString());
}
executorService.shutdown();
}