in frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkerThread.java [280:362]
private void connect()
throws WorkerInitializationException, InterruptedException, FileNotFoundException {
if (!this.serverThread && (model.getPort() == -1)) {
throw new WorkerInitializationException("Backend server is not runniing");
}
String modelName = model.getModelName();
setState(WorkerState.WORKER_STARTED, HttpResponseStatus.OK);
final CountDownLatch latch = new CountDownLatch(1);
final int responseBufferSize = ConfigManager.getInstance().getMaxResponseSize();
try {
connector = new Connector(port);
Bootstrap b = new Bootstrap();
b.group(backendEventGroup)
.channel(connector.getClientChannel())
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(ENCODER);
p.addLast(new ModelResponseDecoder(responseBufferSize));
p.addLast(new WorkerHandler());
}
});
SocketAddress address = connector.getSocketAddress();
logger.info("Connecting to: {}", address);
backendChannel = b.connect(address).sync().channel();
backendChannel
.closeFuture()
.addListener(
(ChannelFutureListener)
future -> {
latch.countDown();
logger.info(
"{} Worker disconnected. {}", getWorkerId(), state);
Thread thread = currentThread.getAndSet(null);
if (thread != null) {
thread.interrupt();
}
});
backendChannel
.newSucceededFuture()
.addListener(
(ChannelFutureListener)
future -> {
// TODO:
// use gpu, batch size in load model command
RequestInput input =
new RequestInput(UUID.randomUUID().toString());
if (gpuId >= 0) {
input.addParameter(
new InputParameter(
"gpu", String.valueOf(gpuId)));
}
Job job =
new Job(
null,
modelName,
WorkerCommands.LOAD,
input);
model.addJob(backendChannel.id().asLongText(), job);
latch.countDown();
});
if (!latch.await(WORKER_TIMEOUT, TimeUnit.MINUTES)) {
throw new WorkerInitializationException(
"Worker failed to initialize within " + WORKER_TIMEOUT + " mins");
}
workerId = workerId + "-" + backendChannel.id();
running.set(true);
} catch (Throwable t) {
// https://github.com/netty/netty/issues/2597
if (t instanceof IOException) {
throw new WorkerInitializationException("Failed to connect to worker.", t);
}
throw t;
}
}