in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4217:4355]
static int32_t _wemq_thread_do_connect (WemqThreadCtx * pThreadCtx)
{
//随机sleep配置时间+0~9ms
struct timeval tv;
gettimeofday (&tv, NULL);
long now_time = tv.tv_sec * 1000000 + tv.tv_usec;
srand ((unsigned int) now_time);
int random_time = rand () % 10;
int retry = pRmbStConfig->iWemqTcpConnectRetryNum;
int sleep_time = pRmbStConfig->iWemqTcpConnectDelayTime * 1000;
int timeout = pRmbStConfig->iWemqTcpConnectTimeout + random_time;
//memset(pThreadCtx->m_cProxyIP, 0, sizeof(pThreadCtx->m_cProxyIP));
pThreadCtx->m_cProxyIP[0] = '\0';
pThreadCtx->m_uiProxyPort = 0;
pThreadCtx->m_iLocalPort = 0;
if (pThreadCtx->m_lRedirect == 0)
{
int iRet = 0;
do
{
iRet =
wemq_proxy_get_server (pThreadCtx->m_cProxyIP,
sizeof (pThreadCtx->m_cProxyIP),
&pThreadCtx->m_uiProxyPort);
if (iRet == 2)
{
LOGRMB (RMB_LOG_ERROR, "get proxy ip/port failed");
sleep (1);
}
}
while (iRet == 2);
}
else
{
pThreadCtx->m_lRedirect = 0;
snprintf (pThreadCtx->m_cProxyIP, sizeof (pThreadCtx->m_cProxyIP), "%s",
pThreadCtx->m_cRedirectIP);
pThreadCtx->m_uiProxyPort = pThreadCtx->m_iRedirectPort;
}
while (retry > 0)
{
if (0 != pRmbStConfig->tlsOnoff)
{
int err = do_ssl_connect (pThreadCtx);
if (0 != err)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [retry: %d|host:%s|port:%u] tls Connect Error!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, retry, pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort);
}
else
{
wemq_getsockename (pThreadCtx->m_iSockFd, NULL, 0,
&pThreadCtx->m_iLocalPort);
if (_wemq_thread_set_fd_nonblock (pThreadCtx, pThreadCtx->m_iSockFd)
!= 0)
{
LOGRMB (RMB_LOG_ERROR,
"wemq thread set pThreadCtx->m_iSockFd=%d to nonblock failed",
pThreadCtx->m_iSockFd);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
}
else
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [host:%s|port:%u|local_port:%d] tls connect to proxy, fd=%d!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort, pThreadCtx->m_iLocalPort,
pThreadCtx->m_iSockFd);
//add fd to epoll
_wemq_thread_add_fd (pThreadCtx);
return 0;
}
}
}
else
{
int iSockFd =
wemq_tcp_connect (pThreadCtx->m_cProxyIP,
(uint16_t) pThreadCtx->m_uiProxyPort, timeout);
if (iSockFd > 0)
{
pThreadCtx->m_iSockFd = iSockFd;
wemq_getsockename (iSockFd, NULL, 0, &pThreadCtx->m_iLocalPort);
if (_wemq_thread_set_fd_nonblock (pThreadCtx, pThreadCtx->m_iSockFd)
!= 0)
{
//exit(1);
LOGRMB (RMB_LOG_ERROR,
"wemq thread set pThreadCtx->m_iSockFd=%d to nonblock failed",
pThreadCtx->m_iSockFd);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
//return -1;
}
else
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [host:%s|port:%u|local_port:%d] connect to proxy!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort, pThreadCtx->m_iLocalPort);
//add fd to epoll
_wemq_thread_add_fd (pThreadCtx);
return 0;
}
}
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [retry: %d|host:%s|port:%u] Connect Error!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, retry, pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort);
retry--;
usleep (sleep_time);
if (retry == 0)
{
wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [host:%s|port:%u] Connect Failed!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort);
return -1;
}
}
return -2;
}