in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [2421:2878]
static int32_t _wemq_thread_do_recv_async (WemqThreadCtx * pThreadCtx,
bool isRecvNewConnect)
{
ASSERT (pThreadCtx);
int nfds =
epoll_wait (pThreadCtx->m_iEpollFd, pThreadCtx->m_ptEvents, MAX_EVENTS,
1);
if (nfds == -1)
{
LOGRMB (RMB_LOG_ERROR, "[%s] [Type:%d] [TID:%lu] epoll wait error:%d!",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID, errno);
return -1;
}
if (nfds == 0)
{
return 0;
}
unsigned long ulLastTime = 0;
unsigned long ulNowTime = 0;
struct timeval tv;
int iTmp = 0;
int iRecvd = 0;
int iRemind = TCP_PKG_LEN_BTYES;
for (iTmp = 0; iTmp < nfds; ++iTmp)
{
// 必须在此判断,如果新旧连接都有消息过来,后面会把当前fd改成新连接的fd,导致下次循环又会重新处理
if (!isRecvNewConnect)
{
pThreadCtx->m_iSockFdNew = pThreadCtx->m_iSockFd;
pThreadCtx->sslNew = pThreadCtx->ssl;
pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdOld;
pThreadCtx->ssl = pThreadCtx->sslOld;
}
if (pThreadCtx->m_ptEvents[iTmp].data.fd == pThreadCtx->m_iSockFd)
{
if (EPOLLIN == (pThreadCtx->m_ptEvents[iTmp].events & EPOLLIN))
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] EPOLLIN EVENT",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
gettimeofday (&tv, NULL);
ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
ulNowTime = ulLastTime;
while (iRemind > 0)
{
int iRecv;
if (NULL != pThreadCtx->ssl)
{
iRecv =
SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind);
}
else
{
iRecv =
recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind, 0);
}
//if (iRecv < 0 && (errno != EAGAIN))
if (iRecv < 0)
{
if (errno == EAGAIN)
{
gettimeofday (&tv, NULL);
ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
usleep (100);
continue;
}
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, iRecv);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv > 0)
{
iRecvd += iRecv;
iRemind -= iRecv;
}
}
if (iRecvd != 4 || strcmp (pThreadCtx->m_pRecvBuff, "WEMQ") != 0)
{
if (!isRecvNewConnect)
{
LOGRMB (RMB_LOG_ERROR, "IP: [old proxy ip:%s|old port:%d]",
pThreadCtx->m_cProxyIPOld, pThreadCtx->m_uiProxyPortOld);
}
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%d] recv header error, buf: %s, len: %d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pThreadCtx->m_pRecvBuff, iRecvd);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
gettimeofday (&tv, NULL);
ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
ulNowTime = ulLastTime;
iRecvd = 0;
iRemind = TCP_PKG_LEN_BTYES;
while (iRemind > 0)
{
int iRecv;
if (NULL != pThreadCtx->ssl)
{
iRecv =
SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind);
}
else
{
iRecv =
recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind, 0);
}
//if (iRecv < 0 && (errno != EAGAIN))
if (iRecv < 0)
{
if (errno == EAGAIN)
{
gettimeofday (&tv, NULL);
ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
usleep (100);
continue;
}
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, iRecv);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv > 0)
{
iRecvd += iRecv;
iRemind -= iRecv;
}
}
if (iRecvd != 4)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%d] recv version error, version=%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pThreadCtx->m_pRecvBuff);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
gettimeofday (&tv, NULL);
ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
ulNowTime = ulLastTime;
iRecvd = 0;
iRemind = TCP_PKG_LEN_BTYES;
while (iRemind > 0)
{
int iRecv;
if (NULL != pThreadCtx->ssl)
{
iRecv =
SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind);
}
else
{
iRecv =
recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind, 0);
}
//if (iRecv < 0 && (errno != EAGAIN))
if (iRecv < 0)
{
if (errno == EAGAIN)
{
gettimeofday (&tv, NULL);
ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
usleep (100);
continue;
}
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, iRecv);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv > 0)
{
iRecvd += iRecv;
iRemind -= iRecv;
}
}
//ASSERT (iRemind == 0);
if (iRemind != 0 || iRecvd != TCP_PKG_LEN_BTYES)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] get msg length failed",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
gettimeofday (&tv, NULL);
ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
ulNowTime = ulLastTime;
iRemind = ntohl (*(uint32_t *) pThreadCtx->m_pRecvBuff);
iRemind -= TCP_PKG_LEN_BTYES;
while (iRemind > 0)
{
int iRecv;
if (NULL != pThreadCtx->ssl)
{
iRecv =
SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind);
}
else
{
iRecv =
recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
iRemind, 0);
}
//if (iRecv < 0 && (errno != EAGAIN))
if (iRecv < 0)
{
if (errno == EAGAIN)
{
gettimeofday (&tv, NULL);
ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
usleep (100);
continue;
}
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, errno);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, iRecv);
_wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
return -2;
}
if (iRecv > 0)
{
iRecvd += iRecv;
iRemind -= iRecv;
}
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv complete len %d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, iRecvd);
if (!isRecvNewConnect)
{
pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdNew;
pThreadCtx->ssl = pThreadCtx->sslNew;
}
//memset(&pThreadCtx->m_stWeMQMSG, 0, sizeof(pThreadCtx->m_stWeMQMSG));
memset (&pThreadCtx->m_stWeMQMSG, 0, (sizeof (int) * 2));
int iRet =
DecodeWeMQMsg (&pThreadCtx->m_stWeMQMSG, pThreadCtx->m_pRecvBuff,
iRecvd);
if (pThreadCtx->m_stWeMQMSG.uiHeaderLen == 0
|| pThreadCtx->m_stWeMQMSG.uiHeaderLen >= MAX_WEMQ_HEADER_LEN)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header complete, header len %d is 0 or too long",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_stWeMQMSG.uiHeaderLen);
return -1;
}
if (pThreadCtx->m_stWeMQMSG.uiHeaderLen > 0)
{
memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
pThreadCtx->m_pRecvBuff + 8,
pThreadCtx->m_stWeMQMSG.uiHeaderLen);
pThreadCtx->m_stWeMQMSG.cStrJsonHeader[pThreadCtx->m_stWeMQMSG.
uiHeaderLen] = '\0';
}
LOGRMB (RMB_LOG_DEBUG, "cStrJsonHeader: %s",
pThreadCtx->m_stWeMQMSG.cStrJsonHeader);
unsigned int uiTmpBodyLen =
pThreadCtx->m_stWeMQMSG.uiTotalLen -
pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8;
if (uiTmpBodyLen >= MAX_WEMQ_BODY_LEN)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq complete,body len %d is too long",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
uiTmpBodyLen);
return -1;
}
if (uiTmpBodyLen == 0)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq complete,body len is 0",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
}
//if (pThreadCtx->m_stWeMQMSG.uiTotalLen - pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8 > 0)
if (uiTmpBodyLen > 0)
{
//set message source
pThreadCtx->m_stWeMQMSG.cStrJsonBody[0] = RMB_MSG_FROM_WEMQ;
memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonBody,
pThreadCtx->m_pRecvBuff + 8 +
pThreadCtx->m_stWeMQMSG.uiHeaderLen,
pThreadCtx->m_stWeMQMSG.uiTotalLen -
pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8);
pThreadCtx->m_stWeMQMSG.cStrJsonBody[uiTmpBodyLen] = '\0';
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header complete,total len %d, header len %d,header %s, body %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_stWeMQMSG.uiTotalLen,
pThreadCtx->m_stWeMQMSG.uiHeaderLen,
pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
pThreadCtx->m_stWeMQMSG.cStrJsonBody);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header ERROR",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
return -1;
}
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] epoll events %d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_ptEvents[iTmp].events);
}
}
if (!isRecvNewConnect)
{
pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdNew;
pThreadCtx->ssl = pThreadCtx->sslNew;
}
}
return iRecvd;
}