private void handle()

in daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java [506:655]


    private void handle(DaemonConnection connection, BuildRequest buildRequest) {
        updateState(Busy);
        final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
        final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
        final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
        final DaemonInputStream daemonInputStream = new DaemonInputStream(
                (projectId, bytesToRead) -> sendQueue.add(Message.requestInput(projectId, bytesToRead)));
        InputStream in = System.in;
        try {
            System.setIn(daemonInputStream);

            LOGGER.info("Executing request");

            Thread sender = new Thread(() -> {
                try {
                    boolean flushed = true;
                    while (true) {
                        Message m;
                        if (flushed) {
                            m = sendQueue.poll(keepAliveMs, TimeUnit.MILLISECONDS);
                            if (m == null) {
                                m = Message.BareMessage.KEEP_ALIVE_SINGLETON;
                            }
                            flushed = false;
                        } else {
                            m = sendQueue.poll();
                            if (m == null) {
                                connection.flush();
                                flushed = true;
                                continue;
                            }
                        }
                        if (m == Message.BareMessage.STOP_SINGLETON) {
                            connection.flush();
                            LOGGER.info("No more message to dispatch");
                            return;
                        }
                        LOGGER.info("Dispatch message: {}", m);
                        connection.dispatch(m);
                    }
                } catch (Throwable t) {
                    LOGGER.error("Error dispatching events", t);
                }
            });
            sender.start();
            Thread receiver = new Thread(() -> {
                try {
                    while (true) {
                        Message message = connection.receive();
                        if (message == null) {
                            break;
                        }
                        LOGGER.info("Received message: {}", message);
                        if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
                            updateState(Canceled);
                            return;
                        } else if (message instanceof Message.InputData) {
                            daemonInputStream.addInputData(((Message.InputData) message).getData());
                        } else {
                            synchronized (recvQueue) {
                                recvQueue.put(message);
                                recvQueue.notifyAll();
                            }
                        }
                    }
                } catch (DaemonException.RecoverableMessageIOException t) {
                    updateState(Canceled);
                } catch (Throwable t) {
                    updateState(Broken);
                    LOGGER.error("Error receiving events", t);
                }
            });
            receiver.start();
            try {
                Connection.setCurrent(new Connection() {
                    @Override
                    public void dispatch(Message message) {
                        try {
                            sendQueue.put(message);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }

                    @Override
                    public <T extends Message> T request(Message request, Class<T> responseType, Predicate<T> matcher) {
                        try {
                            synchronized (recvQueue) {
                                sendQueue.put(request);
                                LOGGER.info("Waiting for response");
                                while (true) {
                                    T t = recvQueue.stream()
                                            .filter(responseType::isInstance)
                                            .map(responseType::cast)
                                            .filter(matcher)
                                            .findFirst()
                                            .orElse(null);
                                    if (t != null) {
                                        recvQueue.remove(t);
                                        LOGGER.info("Received response: {}", t);
                                        return t;
                                    }
                                    recvQueue.wait();
                                }
                            }
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                LoggingOutputStream output = new LoggingOutputStream(s -> sendQueue.add(Message.out(s)));
                LoggingOutputStream error = new LoggingOutputStream(s -> sendQueue.add(Message.err(s)));
                // Process MAVEN_ARGS environment variable
                List<String> args = buildRequest.getArgs();
                String mavenArgsEnv = buildRequest.getEnv().get("MAVEN_ARGS");
                if (mavenArgsEnv != null && !mavenArgsEnv.isEmpty()) {
                    args = new ArrayList<>(args);
                    Arrays.stream(mavenArgsEnv.split(" "))
                            .filter(s -> !s.trim().isEmpty())
                            .forEach(args::add);
                }
                int exitCode = cli.main(
                        args,
                        buildRequest.getWorkingDir(),
                        buildRequest.getProjectDir(),
                        buildRequest.getEnv(),
                        buildEventListener,
                        daemonInputStream,
                        output,
                        error);
                LOGGER.info("Build finished, finishing message dispatch");
                buildEventListener.finish(exitCode);
            } catch (Throwable t) {
                LOGGER.error("Error while building project", t);
                buildEventListener.fail(t);
            } finally {
                sender.join();
                ProjectBuildLogAppender.setProjectId(null);
            }
        } catch (Throwable t) {
            LOGGER.error("Error while building project", t);
        } finally {
            System.setIn(in);
            if (!noDaemon) {
                LOGGER.info("Daemon back to idle");
                updateState(DaemonState.Idle);
                System.gc();
            }
        }
    }