in impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java [441:478]
private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
final AsyncHandler asyncHandler, final long timeoutMillis) {
boolean acquired = this.semaphoreAsync.tryAcquire();
if (acquired) {
final int requestID = request.requestID();
SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis, asyncHandler, once);
responseFuture.setRequestCommand(request);
responseFuture.setRemoteAddr(remoteAddr);
this.ackTables.put(requestID, responseFuture);
try {
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
responseFuture.setSendRequestOK(f.isSuccess());
if (f.isSuccess()) {
return;
}
requestFail(requestID, new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
LOG.warn("Send request command to channel failed.", remoteAddr);
}
};
this.writeAndFlush(channel, request, listener);
} catch (Exception e) {
requestFail(requestID, new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), e));
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
String info = String.format("No available async semaphore to issue the request request %s", request.toString());
requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new SemaphoreExhaustedException(info));
LOG.error(info);
}
}