in frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerThread.java [170:267]
public void run() {
int responseTimeout = model.getResponseTimeout();
Thread thread = Thread.currentThread();
thread.setName(getWorkerName());
currentThread.set(thread);
BaseModelRequest req = null;
int status = HttpURLConnection.HTTP_INTERNAL_ERROR;
try {
connect();
while (isRunning()) {
req = aggregator.getRequest(workerId, state);
long wtStartTime = System.currentTimeMillis();
logger.info("Flushing req. to backend at: " + wtStartTime);
backendChannel.writeAndFlush(req).sync();
long begin = System.currentTimeMillis();
ModelWorkerResponse reply = replies.poll(responseTimeout, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - begin;
logger.info("Backend response time: {}", duration);
if (reply != null) {
aggregator.sendResponse(reply);
} else {
int val = model.incrFailedInfReqs();
logger.error("Number or consecutive unsuccessful inference {}", val);
throw new WorkerInitializationException(
"Backend worker did not respond in given time");
}
switch (req.getCommand()) {
case PREDICT:
model.resetFailedInfReqs();
break;
case LOAD:
if (reply.getCode() == 200) {
setState(WorkerState.WORKER_MODEL_LOADED, HttpURLConnection.HTTP_OK);
backoffIdx = 0;
} else {
setState(WorkerState.WORKER_ERROR, reply.getCode());
status = reply.getCode();
}
break;
case UNLOAD:
case STATS:
default:
break;
}
req = null;
String workerThreadTime =
String.valueOf(((System.currentTimeMillis() - wtStartTime) - duration));
loggerTsMetrics.info(
"{}",
new Metric(
"WorkerThreadTime",
workerThreadTime,
"ms",
ConfigManager.getInstance().getHostName(),
new Dimension("Level", "Host")));
}
} catch (InterruptedException e) {
logger.debug("System state is : " + state);
if (state == WorkerState.WORKER_SCALED_DOWN || state == WorkerState.WORKER_STOPPED) {
logger.debug("Shutting down the thread .. Scaling down.");
} else {
logger.debug(
"Backend worker monitoring thread interrupted or backend worker process died.",
e);
}
} catch (WorkerInitializationException e) {
logger.error("Backend worker error", e);
} catch (OutOfMemoryError oom) {
logger.error("Out of memory error when creating workers", oom);
status = HttpURLConnection.HTTP_ENTITY_TOO_LARGE;
} catch (Throwable t) {
logger.warn("Backend worker thread exception.", t);
} finally {
// WorkerThread is running in thread pool, the thread will be assigned to next
// Runnable once this worker is finished. If currentThread keep holding the reference
// of the thread, currentThread.interrupt() might kill next worker.
backendChannel.disconnect();
currentThread.set(null);
Integer exitValue = lifeCycle.getExitValue();
if (exitValue != null && exitValue == 137) {
status = HttpURLConnection.HTTP_ENTITY_TOO_LARGE;
}
if (req != null) {
aggregator.sendError(req, "Worker died.", status);
}
setState(WorkerState.WORKER_STOPPED, status);
lifeCycle.exit();
retry();
}
}