in impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java [379:424]
private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
try {
final int requestID = request.requestID();
final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis);
responseFuture.setRequestCommand(request);
responseFuture.setRemoteAddr(remoteAddr);
this.ackTables.put(requestID, responseFuture);
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
ackTables.remove(requestID);
responseFuture.setCause(new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
responseFuture.putResponse(null);
LOG.warn("Send request command to {} failed !", remoteAddr);
}
}
};
this.writeAndFlush(channel, request, listener);
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
responseFuture.setCause(new RemotingTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis));
throw responseFuture.getCause();
} else {
throw responseFuture.getCause();
}
}
return responseCommand;
} finally {
this.ackTables.remove(request.requestID());
}
}