in daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java [501:636]
private void handle(DaemonConnection connection, BuildRequest buildRequest) {
updateState(Busy);
final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
final DaemonInputStream daemonInputStream =
new DaemonInputStream(projectId -> sendQueue.add(Message.requestInput(projectId)));
try (ProjectBuildLogAppender logAppender = new ProjectBuildLogAppender(buildEventListener)) {
LOGGER.info("Executing request");
Thread sender = new Thread(() -> {
try {
boolean flushed = true;
while (true) {
Message m;
if (flushed) {
m = sendQueue.poll(keepAliveMs, TimeUnit.MILLISECONDS);
if (m == null) {
m = Message.BareMessage.KEEP_ALIVE_SINGLETON;
}
flushed = false;
} else {
m = sendQueue.poll();
if (m == null) {
connection.flush();
flushed = true;
continue;
}
}
if (m == Message.BareMessage.STOP_SINGLETON) {
connection.flush();
LOGGER.info("No more message to dispatch");
return;
}
LOGGER.info("Dispatch message: {}", m);
connection.dispatch(m);
}
} catch (Throwable t) {
LOGGER.error("Error dispatching events", t);
}
});
sender.start();
Thread receiver = new Thread(() -> {
try {
while (true) {
Message message = connection.receive();
if (message == null) {
break;
}
LOGGER.info("Received message: {}", message);
if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
updateState(Canceled);
return;
} else if (message instanceof Message.InputData) {
daemonInputStream.addInputData(((Message.InputData) message).getData());
} else {
synchronized (recvQueue) {
recvQueue.put(message);
recvQueue.notifyAll();
}
}
}
} catch (DaemonException.RecoverableMessageIOException t) {
updateState(Canceled);
} catch (Throwable t) {
updateState(Broken);
LOGGER.error("Error receiving events", t);
}
});
receiver.start();
try {
Connection.setCurrent(new Connection() {
@Override
public void dispatch(Message message) {
try {
sendQueue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public <T extends Message> T request(Message request, Class<T> responseType, Predicate<T> matcher) {
try {
synchronized (recvQueue) {
sendQueue.put(request);
LOGGER.info("Waiting for response");
while (true) {
T t = recvQueue.stream()
.filter(responseType::isInstance)
.map(responseType::cast)
.filter(matcher)
.findFirst()
.orElse(null);
if (t != null) {
recvQueue.remove(t);
LOGGER.info("Received response: {}", t);
return t;
}
recvQueue.wait();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
System.setIn(daemonInputStream);
System.setOut(new LoggingOutputStream(s -> sendQueue.add(Message.out(s))).printStream());
System.setErr(new LoggingOutputStream(s -> sendQueue.add(Message.err(s))).printStream());
int exitCode = cli.main(
buildRequest.getArgs(),
buildRequest.getWorkingDir(),
buildRequest.getProjectDir(),
buildRequest.getEnv(),
buildEventListener);
LOGGER.info("Build finished, finishing message dispatch");
buildEventListener.finish(exitCode);
} catch (Throwable t) {
LOGGER.error("Error while building project", t);
buildEventListener.fail(t);
} finally {
sender.join();
ProjectBuildLogAppender.setProjectId(null);
}
} catch (Throwable t) {
LOGGER.error("Error while building project", t);
} finally {
if (!noDaemon) {
LOGGER.info("Daemon back to idle");
updateState(DaemonState.Idle);
System.gc();
}
}
}