in src/transport/TcpTransport.cpp [146:201]
TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMillis) {
string hostname;
short port;
LOG_DEBUG("connect to [%s].", strServerURL.c_str());
if (!UtilAll::SplitURL(strServerURL, hostname, port)) {
LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
return TCP_CONNECT_STATUS_FAILED;
}
{
std::lock_guard<std::mutex> lock(m_eventLock);
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
try {
sin.sin_addr.s_addr = getInetAddr(hostname);
} catch (const MQClientException& e) {
LOG_INFO("connect to %s failed, %s", strServerURL.c_str(), e.what());
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
}
sin.sin_port = htons(port);
m_event.reset(EventLoop::GetDefaultEventLoop()->createBufferEvent(-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE,
m_enableSsl, m_sslPropertyFile));
m_event->setCallback(readNextMessageIntCallback, nullptr, eventCallback, shared_from_this());
m_event->setWatermark(EV_READ, 4, 0);
m_event->enable(EV_READ | EV_WRITE);
setTcpConnectStatus(TCP_CONNECT_STATUS_WAIT);
if (m_event->connect((struct sockaddr*)&sin, sizeof(sin)) < 0) {
LOG_INFO("connect to fd:%d failed", m_event->getfd());
freeBufferEvent();
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
}
}
if (timeoutMillis <= 0) {
LOG_INFO("try to connect to fd:%d, addr:%s", m_event->getfd(), hostname.c_str());
return TCP_CONNECT_STATUS_WAIT;
}
TcpConnectStatus connectStatus = waitTcpConnectEvent(timeoutMillis);
if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
LOG_WARN("can not connect to server:%s", strServerURL.c_str());
std::lock_guard<std::mutex> lock(m_eventLock);
freeBufferEvent();
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
}
return TCP_CONNECT_STATUS_SUCCESS;
}