in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [2257:2419]
static int32_t _wemq_thread_do_recv_sync (WemqThreadCtx * pThreadCtx)
{
ASSERT (pThreadCtx);
int timeout = pRmbStConfig->iWemqTcpSocketTimeout;
unsigned int uiTimeOutTimes = 0;
unsigned int uiClosedByPeerTimes = 0;
while (1)
{
uint32_t iRecvLen;
int iRet =
wemq_tcp_recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff,
&iRecvLen, timeout, pThreadCtx->ssl);
if (iRet == 0)
{
uiTimeOutTimes += 1;
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP recv timeout!timeout_times=%u, fd=%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
uiTimeOutTimes, pThreadCtx->m_iSockFd);
if (uiTimeOutTimes >= 12)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP continuity recv timeout times=%d, so close connect",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
uiTimeOutTimes);
uiTimeOutTimes = 0;
_wemq_thread_del_fd (pThreadCtx);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
wemq_proxy_to_black_list (pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort);
return -2;
}
continue;
}
else if (iRet == -2)
{
uiClosedByPeerTimes += 1;
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP conncet closed by peer(%d)",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, errno);
if (uiClosedByPeerTimes >= 3)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP conncet continuity closed by peer times=%d, so close connect",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
uiClosedByPeerTimes);
uiClosedByPeerTimes = 0;
_wemq_thread_del_fd (pThreadCtx);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
wemq_proxy_to_black_list (pThreadCtx->m_cProxyIP,
pThreadCtx->m_uiProxyPort);
return -2;
}
continue;
}
else if (iRet == -3)
{
// msg is not full
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] msg is not full",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
_wemq_thread_del_fd (pThreadCtx);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
return 1;
}
else if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] wemq_tcp_recv error",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
return -1;
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv complete len %d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, iRet);
//memset(&pThreadCtx->m_stWeMQMSG, 0, sizeof(pThreadCtx->m_stWeMQMSG));
memset (&pThreadCtx->m_stWeMQMSG, 0, (sizeof (int) * 2));
DecodeWeMQMsg (&pThreadCtx->m_stWeMQMSG, pThreadCtx->m_pRecvBuff, iRet);
if (pThreadCtx->m_stWeMQMSG.uiHeaderLen == 0
|| pThreadCtx->m_stWeMQMSG.uiHeaderLen >= MAX_WEMQ_HEADER_LEN)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv header len %u is 0 or too long",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pThreadCtx->m_stWeMQMSG.uiHeaderLen);
return -1;
}
if (pThreadCtx->m_stWeMQMSG.uiHeaderLen > 0)
{
memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
pThreadCtx->m_pRecvBuff + 8,
pThreadCtx->m_stWeMQMSG.uiHeaderLen);
pThreadCtx->m_stWeMQMSG.cStrJsonHeader[pThreadCtx->m_stWeMQMSG.
uiHeaderLen] = '\0';
}
unsigned int uiTmpBodyLen =
pThreadCtx->m_stWeMQMSG.uiTotalLen -
pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8;
if (uiTmpBodyLen >= MAX_WEMQ_BODY_LEN)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv body len %d is too long",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
uiTmpBodyLen);
return -1;
}
if (uiTmpBodyLen == 0)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv body len is 0",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
}
//if (pThreadCtx->m_stWeMQMSG.uiTotalLen - pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8 > 0)
if (uiTmpBodyLen > 0)
{
memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonBody,
pThreadCtx->m_pRecvBuff + 8 +
pThreadCtx->m_stWeMQMSG.uiHeaderLen,
pThreadCtx->m_stWeMQMSG.uiTotalLen -
pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8);
pThreadCtx->m_stWeMQMSG.cStrJsonBody[uiTmpBodyLen] = '\0';
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header complete,total len %d, header len %d,header %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_stWeMQMSG.uiTotalLen,
pThreadCtx->m_stWeMQMSG.uiHeaderLen,
pThreadCtx->m_stWeMQMSG.cStrJsonHeader);
return 0;
}
return -1;
}