in server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java [402:480]
public void channelRead(ChannelHandlerContext ctx, Object msg) {
imapCommandsMetric.increment();
ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
Optional.ofNullable(ctx.channel().attr(REQUEST_COUNTER))
.flatMap(counter -> Optional.ofNullable(counter.get()))
.ifPresent(AtomicLong::incrementAndGet);
ImapLinerarizer linearalizer = ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).get();
synchronized (linearalizer) {
if (linearalizer.isExecutingRequest.get()) {
linearalizer.throttled.add(msg);
return;
}
linearalizer.isExecutingRequest.set(true);
}
ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel(), session);
ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
writer.setFlushCallback(response::flush);
ImapMessage message = (ImapMessage) msg;
beforeIDLEUponProcessing(ctx);
ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
Disposable disposable = reactiveThrottler.throttle(processor.processReactive(message, responseEncoder, session)
.doOnEach(Throwing.consumer(signal -> {
if (session.getState() == ImapSessionState.LOGOUT) {
// Make sure we close the channel after all the buffers were flushed out
Channel channel = ctx.channel();
if (channel.isActive()) {
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
if (signal.isOnComplete()) {
IOException failure = responseEncoder.getFailure();
if (failure != null) {
try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {
LOGGER.info(failure.getMessage());
LOGGER.debug("Failed to write {}", message, failure);
} catch (IOException e) {
throw new RuntimeException(e);
}
ctx.fireExceptionCaught(failure);
}
}
Object waitingMessage;
synchronized (linearalizer) {
linearalizer.isExecutingRequest.set(false);
waitingMessage = linearalizer.throttled.poll();
}
if (signal.isOnComplete() || signal.isOnError()) {
afterIDLEUponProcessing(ctx);
}
if (signal.hasError()) {
ctx.fireExceptionCaught(signal.getThrowable());
}
disposableAttribute.set(null);
response.flush();
ctx.fireChannelReadComplete();
if (signal.isOnComplete() || signal.isOnError()) {
if (waitingMessage != null && signal.isOnComplete()) {
ctx.channel().eventLoop().execute(
() -> channelRead(ctx, waitingMessage));
}
}
}))
.contextWrite(ReactorUtils.context("imap", mdc(session))), message,
runnable -> ctx.channel().eventLoop().execute(runnable))
// Manage throttling errors
.doOnError(ctx::fireExceptionCaught)
.doFinally(Throwing.consumer(any -> {
if (message instanceof Closeable) {
((Closeable) message).close();
}
}))
.subscribe();
disposableAttribute.set(disposable);
}