in frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkerThread.java [140:199]
private void runWorker()
throws WorkerInitializationException, InterruptedException, FileNotFoundException {
int responseTimeout = model.getResponseTimeout();
while (isRunning()) {
req = aggregator.getRequest(backendChannel.id().asLongText(), state);
backendChannel.writeAndFlush(req).sync();
long begin = System.currentTimeMillis();
// TODO: Change this to configurable param
ModelWorkerResponse reply = replies.poll(responseTimeout, TimeUnit.MINUTES);
long duration = System.currentTimeMillis() - begin;
logger.info("Backend response time: {}", duration);
if (reply != null) {
aggregator.sendResponse(reply);
} else {
int val = model.incrFailedInfReqs();
logger.error("Number or consecutive unsuccessful inference {}", val);
throw new WorkerInitializationException(
"Backend worker did not respond in given time");
}
switch (req.getCommand()) {
case PREDICT:
model.resetFailedInfReqs();
break;
case LOAD:
String message = reply.getMessage();
String tmpdir = System.getProperty("java.io.tmpdir");
out =
new RandomAccessFile(
tmpdir + '/' + backendChannel.id().asLongText() + "-stdout",
"rw");
err =
new RandomAccessFile(
tmpdir + '/' + backendChannel.id().asLongText() + "-stderr",
"rw");
if (reply.getCode() == 200) {
setState(WorkerState.WORKER_MODEL_LOADED, HttpResponseStatus.OK);
lifeCycle.setPid(
Integer.parseInt(
message.substring(
message.indexOf("[PID]:") + 6, message.length())));
lifeCycle.attachIOStreams(
threadName,
Channels.newInputStream(out.getChannel()),
Channels.newInputStream(err.getChannel()));
backoffIdx = 0;
} else {
setState(
WorkerState.WORKER_ERROR,
HttpResponseStatus.valueOf(reply.getCode()));
}
break;
case UNLOAD:
case STATS:
default:
break;
}
req = null;
}
}