in daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java [506:655]
private void handle(DaemonConnection connection, BuildRequest buildRequest) {
updateState(Busy);
final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
final DaemonInputStream daemonInputStream = new DaemonInputStream(
(projectId, bytesToRead) -> sendQueue.add(Message.requestInput(projectId, bytesToRead)));
InputStream in = System.in;
try {
System.setIn(daemonInputStream);
LOGGER.info("Executing request");
Thread sender = new Thread(() -> {
try {
boolean flushed = true;
while (true) {
Message m;
if (flushed) {
m = sendQueue.poll(keepAliveMs, TimeUnit.MILLISECONDS);
if (m == null) {
m = Message.BareMessage.KEEP_ALIVE_SINGLETON;
}
flushed = false;
} else {
m = sendQueue.poll();
if (m == null) {
connection.flush();
flushed = true;
continue;
}
}
if (m == Message.BareMessage.STOP_SINGLETON) {
connection.flush();
LOGGER.info("No more message to dispatch");
return;
}
LOGGER.info("Dispatch message: {}", m);
connection.dispatch(m);
}
} catch (Throwable t) {
LOGGER.error("Error dispatching events", t);
}
});
sender.start();
Thread receiver = new Thread(() -> {
try {
while (true) {
Message message = connection.receive();
if (message == null) {
break;
}
LOGGER.info("Received message: {}", message);
if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
updateState(Canceled);
return;
} else if (message instanceof Message.InputData) {
daemonInputStream.addInputData(((Message.InputData) message).getData());
} else {
synchronized (recvQueue) {
recvQueue.put(message);
recvQueue.notifyAll();
}
}
}
} catch (DaemonException.RecoverableMessageIOException t) {
updateState(Canceled);
} catch (Throwable t) {
updateState(Broken);
LOGGER.error("Error receiving events", t);
}
});
receiver.start();
try {
Connection.setCurrent(new Connection() {
@Override
public void dispatch(Message message) {
try {
sendQueue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public <T extends Message> T request(Message request, Class<T> responseType, Predicate<T> matcher) {
try {
synchronized (recvQueue) {
sendQueue.put(request);
LOGGER.info("Waiting for response");
while (true) {
T t = recvQueue.stream()
.filter(responseType::isInstance)
.map(responseType::cast)
.filter(matcher)
.findFirst()
.orElse(null);
if (t != null) {
recvQueue.remove(t);
LOGGER.info("Received response: {}", t);
return t;
}
recvQueue.wait();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
LoggingOutputStream output = new LoggingOutputStream(s -> sendQueue.add(Message.out(s)));
LoggingOutputStream error = new LoggingOutputStream(s -> sendQueue.add(Message.err(s)));
// Process MAVEN_ARGS environment variable
List<String> args = buildRequest.getArgs();
String mavenArgsEnv = buildRequest.getEnv().get("MAVEN_ARGS");
if (mavenArgsEnv != null && !mavenArgsEnv.isEmpty()) {
args = new ArrayList<>(args);
Arrays.stream(mavenArgsEnv.split(" "))
.filter(s -> !s.trim().isEmpty())
.forEach(args::add);
}
int exitCode = cli.main(
args,
buildRequest.getWorkingDir(),
buildRequest.getProjectDir(),
buildRequest.getEnv(),
buildEventListener,
daemonInputStream,
output,
error);
LOGGER.info("Build finished, finishing message dispatch");
buildEventListener.finish(exitCode);
} catch (Throwable t) {
LOGGER.error("Error while building project", t);
buildEventListener.fail(t);
} finally {
sender.join();
ProjectBuildLogAppender.setProjectId(null);
}
} catch (Throwable t) {
LOGGER.error("Error while building project", t);
} finally {
System.setIn(in);
if (!noDaemon) {
LOGGER.info("Daemon back to idle");
updateState(DaemonState.Idle);
System.gc();
}
}
}