in impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java [213:241]
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.cmdCode());
if (processorExecutorPair == null) {
final RemotingCommand response = commandFactory().createResponse(cmd);
response.opCode(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
ctx.writeAndFlush(response);
LOG.warn("The command code {} is NOT supported!", cmd.cmdCode());
return;
}
RemotingChannel channel = new NettyChannelImpl(ctx.channel());
Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel);
try {
processorExecutorPair.getRight().submit(run);
} catch (RejectedExecutionException e) {
LOG.warn(String.format("Request %s from %s is rejected by server executor %s !", cmd,
RemotingUtil.extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
RemotingCommand response = remotingCommandFactory.createResponse(cmd);
response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
response.remark("SYSTEM_BUSY");
writeAndFlush(ctx.channel(), response);
}
}
}