public void channelRead()

in server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java [402:480]


    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        imapCommandsMetric.increment();
        ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
        Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
        Optional.ofNullable(ctx.channel().attr(REQUEST_COUNTER))
            .flatMap(counter -> Optional.ofNullable(counter.get()))
            .ifPresent(AtomicLong::incrementAndGet);

        ImapLinerarizer linearalizer = ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).get();
        synchronized (linearalizer) {
            if (linearalizer.isExecutingRequest.get()) {
                linearalizer.throttled.add(msg);
                return;
            }
            linearalizer.isExecutingRequest.set(true);
        }

        ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel(), session);
        ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
        writer.setFlushCallback(response::flush);
        ImapMessage message = (ImapMessage) msg;

        beforeIDLEUponProcessing(ctx);
        ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
        Disposable disposable = reactiveThrottler.throttle(processor.processReactive(message, responseEncoder, session)
                .doOnEach(Throwing.consumer(signal -> {
                    if (session.getState() == ImapSessionState.LOGOUT) {
                        // Make sure we close the channel after all the buffers were flushed out
                        Channel channel = ctx.channel();
                        if (channel.isActive()) {
                            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
                        }
                    }
                    if (signal.isOnComplete()) {
                        IOException failure = responseEncoder.getFailure();
                        if (failure != null) {
                            try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {
                                LOGGER.info(failure.getMessage());
                                LOGGER.debug("Failed to write {}", message, failure);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }

                            ctx.fireExceptionCaught(failure);
                        }
                    }
                    Object waitingMessage;
                    synchronized (linearalizer) {
                        linearalizer.isExecutingRequest.set(false);
                        waitingMessage = linearalizer.throttled.poll();
                    }
                    if (signal.isOnComplete() || signal.isOnError()) {
                        afterIDLEUponProcessing(ctx);
                    }
                    if (signal.hasError()) {
                        ctx.fireExceptionCaught(signal.getThrowable());
                    }
                    disposableAttribute.set(null);
                    response.flush();
                    ctx.fireChannelReadComplete();
                    if (signal.isOnComplete() || signal.isOnError()) {
                        if (waitingMessage != null && signal.isOnComplete()) {
                            ctx.channel().eventLoop().execute(
                                () -> channelRead(ctx, waitingMessage));
                        }
                    }
                }))
                .contextWrite(ReactorUtils.context("imap", mdc(session))), message,
                runnable -> ctx.channel().eventLoop().execute(runnable))
            // Manage throttling errors
            .doOnError(ctx::fireExceptionCaught)
            .doFinally(Throwing.consumer(any -> {
                if (message instanceof Closeable) {
                    ((Closeable) message).close();
                }
            }))
            .subscribe();
        disposableAttribute.set(disposable);
    }