in impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java [68:124]
private Channel createChannel(final String addr) {
RemotingChannelFuture cw = null;
try {
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isActive()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
String[] s = addr.split(":");
SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new RemotingChannelFuture(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
LOG.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException ignore) {
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) {
if (cw.isActive()) {
LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
this.closeChannel(addr, cw.getChannel());
}
} else {
LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
this.closeChannel(addr, cw.getChannel());
}
}
return null;
}