in src/transport/TcpRemotingClient.cpp [298:361]
std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) {
std::shared_ptr<TcpTransport> tts;
{
// try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
// long time, if could not get m_tcpLock, return NULL
std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
if (!lock.owns_lock()) {
if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
std::shared_ptr<TcpTransport> pTcp;
return pTcp;
}
}
// check for reuse
if (m_tcpTable.find(addr) != m_tcpTable.end()) {
std::shared_ptr<TcpTransport> tcp = m_tcpTable[addr];
if (tcp) {
TcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) {
return tcp;
} else if (connectStatus == TCP_CONNECT_STATUS_WAIT) {
tts = tcp;
} else if (connectStatus == TCP_CONNECT_STATUS_FAILED) {
LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
tcp->disconnect(addr); // avoid coredump when connection with broker was broken
m_tcpTable.erase(addr);
} else {
LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
m_tcpTable.erase(addr);
}
}
}
//<!callback;
TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr;
if (!tts) {
tts = TcpTransport::CreateTransport(this, m_enableSsl, m_sslPropertyFile, callback);
TcpConnectStatus connectStatus = tts->connect(addr, 0); // use non-block
if (connectStatus != TCP_CONNECT_STATUS_WAIT) {
LOG_WARN("can not connect to:%s", addr.c_str());
tts->disconnect(addr);
std::shared_ptr<TcpTransport> pTcp;
return pTcp;
} else {
// even if connecting failed finally, this server transport will be erased by next CreateTransport
m_tcpTable[addr] = tts;
}
}
}
TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout));
if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
LOG_WARN("can not connect to server:%s", addr.c_str());
tts->disconnect(addr);
std::shared_ptr<TcpTransport> pTcp;
return pTcp;
} else {
LOG_INFO("connect server with addr:%s success", addr.c_str());
return tts;
}
}