in frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerThread.java [281:363]
private void connect() throws WorkerInitializationException, InterruptedException {
if (!configManager.isDebug()) {
lifeCycle.startWorker(port);
}
String modelName = model.getModelName();
String modelVersion = model.getVersion();
setState(WorkerState.WORKER_STARTED, HttpURLConnection.HTTP_OK);
final CountDownLatch latch = new CountDownLatch(1);
final int responseBufferSize = configManager.getMaxResponseSize();
try {
Connector connector = new Connector(port);
Bootstrap b = new Bootstrap();
b.group(backendEventGroup)
.channel(connector.getClientChannel())
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(ENCODER);
p.addLast(new ModelResponseDecoder(responseBufferSize));
p.addLast(new WorkerHandler());
}
});
SocketAddress address = connector.getSocketAddress();
logger.info("Connecting to: {}", address);
backendChannel = b.connect(address).sync().channel();
backendChannel
.closeFuture()
.addListener(
(ChannelFutureListener)
future -> {
latch.countDown();
logger.info(
"{} Worker disconnected. {}", getWorkerId(), state);
Thread thread = currentThread.getAndSet(null);
if (thread != null) {
thread.interrupt();
}
});
backendChannel
.newSucceededFuture()
.addListener(
(ChannelFutureListener)
future -> {
// TODO:
// use gpu, batch size in load model command
RequestInput input =
new RequestInput(UUID.randomUUID().toString());
if (gpuId >= 0) {
input.addParameter(
new InputParameter(
"gpu", String.valueOf(gpuId)));
}
Job job =
new RestJob(
null,
modelName,
modelVersion,
WorkerCommands.LOAD,
input);
model.addJob(workerId, job);
latch.countDown();
});
if (!latch.await(WORKER_TIMEOUT, TimeUnit.MINUTES)) {
throw new WorkerInitializationException(
"Worker failed to initialize within " + WORKER_TIMEOUT + " mins");
}
running.set(true);
} catch (Throwable t) {
// https://github.com/netty/netty/issues/2597
if (t instanceof IOException) {
throw new WorkerInitializationException("Failed to connect to worker.", t);
}
throw t;
}
}