static int32_t _wemq_thread_do_recv_async()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [2421:2878]


static int32_t _wemq_thread_do_recv_async (WemqThreadCtx * pThreadCtx,
                                           bool isRecvNewConnect)
{
  ASSERT (pThreadCtx);

  int nfds =
    epoll_wait (pThreadCtx->m_iEpollFd, pThreadCtx->m_ptEvents, MAX_EVENTS,
                1);
  if (nfds == -1)
  {
    LOGRMB (RMB_LOG_ERROR, "[%s] [Type:%d] [TID:%lu] epoll wait error:%d!",
            STATE_MAP[pThreadCtx->m_iState],
            pThreadCtx->m_contextType, pThreadCtx->m_threadID, errno);
    return -1;
  }
  if (nfds == 0)
  {
    return 0;
  }

  unsigned long ulLastTime = 0;
  unsigned long ulNowTime = 0;
  struct timeval tv;

  int iTmp = 0;
  int iRecvd = 0;
  int iRemind = TCP_PKG_LEN_BTYES;

  for (iTmp = 0; iTmp < nfds; ++iTmp)
  {
    // 必须在此判断,如果新旧连接都有消息过来,后面会把当前fd改成新连接的fd,导致下次循环又会重新处理
    if (!isRecvNewConnect)
    {
      pThreadCtx->m_iSockFdNew = pThreadCtx->m_iSockFd;
      pThreadCtx->sslNew = pThreadCtx->ssl;
      pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdOld;
      pThreadCtx->ssl = pThreadCtx->sslOld;
    }

    if (pThreadCtx->m_ptEvents[iTmp].data.fd == pThreadCtx->m_iSockFd)
    {
      if (EPOLLIN == (pThreadCtx->m_ptEvents[iTmp].events & EPOLLIN))
      {
        LOGRMB (RMB_LOG_DEBUG,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] EPOLLIN EVENT",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
        gettimeofday (&tv, NULL);
        ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
        ulNowTime = ulLastTime;
        while (iRemind > 0)
        {
          int iRecv;
          if (NULL != pThreadCtx->ssl)
          {
            iRecv =
              SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
                        iRemind);
          }
          else
          {
            iRecv =
              recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
                    iRemind, 0);
          }
          //if (iRecv < 0 && (errno != EAGAIN))
          if (iRecv < 0)
          {
            if (errno == EAGAIN)
            {
              gettimeofday (&tv, NULL);
              ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
              if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
              {
                LOGRMB (RMB_LOG_ERROR,
                        "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                        STATE_MAP[pThreadCtx->m_iState],
                        pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                        pThreadCtx->m_iLocalPort, errno);
                _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
                return -2;
              }

              usleep (100);
              continue;
            }

            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, errno);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv == 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, iRecv);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv > 0)
          {
            iRecvd += iRecv;
            iRemind -= iRecv;
          }
        }

        if (iRecvd != 4 || strcmp (pThreadCtx->m_pRecvBuff, "WEMQ") != 0)
        {
          if (!isRecvNewConnect)
          {
            LOGRMB (RMB_LOG_ERROR, "IP: [old proxy ip:%s|old port:%d]",
                    pThreadCtx->m_cProxyIPOld, pThreadCtx->m_uiProxyPortOld);
          }
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%d] recv header error, buf: %s, len: %d",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
                  pThreadCtx->m_pRecvBuff, iRecvd);
          _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
          return -2;
        }

        gettimeofday (&tv, NULL);
        ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
        ulNowTime = ulLastTime;
        iRecvd = 0;
        iRemind = TCP_PKG_LEN_BTYES;
        while (iRemind > 0)
        {
          int iRecv;
          if (NULL != pThreadCtx->ssl)
          {
            iRecv =
              SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
                        iRemind);
          }
          else
          {
            iRecv =
              recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
                    iRemind, 0);
          }
          //if (iRecv < 0 && (errno != EAGAIN))
          if (iRecv < 0)
          {
            if (errno == EAGAIN)
            {
              gettimeofday (&tv, NULL);
              ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
              if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
              {
                LOGRMB (RMB_LOG_ERROR,
                        "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                        STATE_MAP[pThreadCtx->m_iState],
                        pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                        pThreadCtx->m_iLocalPort, errno);
                _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
                return -2;
              }

              usleep (100);
              continue;
            }

            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, errno);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv == 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, iRecv);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv > 0)
          {
            iRecvd += iRecv;
            iRemind -= iRecv;
          }
        }

        if (iRecvd != 4)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%d] recv version error, version=%s",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
                  pThreadCtx->m_pRecvBuff);
          _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
          return -2;
        }

        gettimeofday (&tv, NULL);
        ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
        ulNowTime = ulLastTime;

        iRecvd = 0;
        iRemind = TCP_PKG_LEN_BTYES;
        while (iRemind > 0)
        {
          int iRecv;
          if (NULL != pThreadCtx->ssl)
          {
            iRecv =
              SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
                        iRemind);
          }
          else
          {
            iRecv =
              recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
                    iRemind, 0);
          }
          //if (iRecv < 0 && (errno != EAGAIN))
          if (iRecv < 0)
          {
            if (errno == EAGAIN)
            {
              gettimeofday (&tv, NULL);
              ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
              if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
              {
                LOGRMB (RMB_LOG_ERROR,
                        "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                        STATE_MAP[pThreadCtx->m_iState],
                        pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                        pThreadCtx->m_iLocalPort, errno);
                _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
                return -2;
              }

              usleep (100);
              continue;
            }

            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, errno);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv == 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, iRecv);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv > 0)
          {
            iRecvd += iRecv;
            iRemind -= iRecv;
          }
        }
        //ASSERT (iRemind == 0);
        if (iRemind != 0 || iRecvd != TCP_PKG_LEN_BTYES)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] get msg length failed",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
          _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
          return -2;
        }

        gettimeofday (&tv, NULL);
        ulLastTime = tv.tv_sec * 1000000 + tv.tv_usec;
        ulNowTime = ulLastTime;

        iRemind = ntohl (*(uint32_t *) pThreadCtx->m_pRecvBuff);
        iRemind -= TCP_PKG_LEN_BTYES;
        while (iRemind > 0)
        {
          int iRecv;
          if (NULL != pThreadCtx->ssl)
          {
            iRecv =
              SSL_read (pThreadCtx->ssl, pThreadCtx->m_pRecvBuff + iRecvd,
                        iRemind);
          }
          else
          {
            iRecv =
              recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff + iRecvd,
                    iRemind, 0);
          }
          //if (iRecv < 0 && (errno != EAGAIN))
          if (iRecv < 0)
          {
            if (errno == EAGAIN)
            {
              gettimeofday (&tv, NULL);
              ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
              if ((ulNowTime - ulLastTime) > (1 * 60 * 1000000))
              {
                LOGRMB (RMB_LOG_ERROR,
                        "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                        STATE_MAP[pThreadCtx->m_iState],
                        pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                        pThreadCtx->m_iLocalPort, errno);
                _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
                return -2;
              }

              usleep (100);
              continue;
            }

            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv return < 0, errno %d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, errno);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }

          if (iRecv == 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Peer close connect, fd close() ret=%d",
                    STATE_MAP[pThreadCtx->m_iState],
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pThreadCtx->m_iLocalPort, iRecv);
            _wemq_thread_reset_sockFd (pThreadCtx, isRecvNewConnect);
            return -2;
          }
          if (iRecv > 0)
          {
            iRecvd += iRecv;
            iRemind -= iRecv;
          }
        }

        LOGRMB (RMB_LOG_DEBUG,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] recv complete len %d",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, iRecvd);
        if (!isRecvNewConnect)
        {
          pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdNew;
          pThreadCtx->ssl = pThreadCtx->sslNew;
        }
        //memset(&pThreadCtx->m_stWeMQMSG, 0, sizeof(pThreadCtx->m_stWeMQMSG));
        memset (&pThreadCtx->m_stWeMQMSG, 0, (sizeof (int) * 2));
        int iRet =
          DecodeWeMQMsg (&pThreadCtx->m_stWeMQMSG, pThreadCtx->m_pRecvBuff,
                         iRecvd);

        if (pThreadCtx->m_stWeMQMSG.uiHeaderLen == 0
            || pThreadCtx->m_stWeMQMSG.uiHeaderLen >= MAX_WEMQ_HEADER_LEN)
        {
          LOGRMB (RMB_LOG_DEBUG,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header complete, header len %d is 0 or too long",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_stWeMQMSG.uiHeaderLen);
          return -1;
        }
        if (pThreadCtx->m_stWeMQMSG.uiHeaderLen > 0)
        {
          memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
                  pThreadCtx->m_pRecvBuff + 8,
                  pThreadCtx->m_stWeMQMSG.uiHeaderLen);
          pThreadCtx->m_stWeMQMSG.cStrJsonHeader[pThreadCtx->m_stWeMQMSG.
                                                 uiHeaderLen] = '\0';
        }
        LOGRMB (RMB_LOG_DEBUG, "cStrJsonHeader: %s",
                pThreadCtx->m_stWeMQMSG.cStrJsonHeader);
        unsigned int uiTmpBodyLen =
          pThreadCtx->m_stWeMQMSG.uiTotalLen -
          pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8;
        if (uiTmpBodyLen >= MAX_WEMQ_BODY_LEN)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d]  Decode Wemq complete,body len %d is too long",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  uiTmpBodyLen);
          return -1;
        }
        if (uiTmpBodyLen == 0)
        {
          LOGRMB (RMB_LOG_DEBUG,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d]  Decode Wemq complete,body len is 0",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
        }
        //if (pThreadCtx->m_stWeMQMSG.uiTotalLen - pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8 > 0)
        if (uiTmpBodyLen > 0)
        {
          //set message source
          pThreadCtx->m_stWeMQMSG.cStrJsonBody[0] = RMB_MSG_FROM_WEMQ;
          memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonBody,
                  pThreadCtx->m_pRecvBuff + 8 +
                  pThreadCtx->m_stWeMQMSG.uiHeaderLen,
                  pThreadCtx->m_stWeMQMSG.uiTotalLen -
                  pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8);
          pThreadCtx->m_stWeMQMSG.cStrJsonBody[uiTmpBodyLen] = '\0';
        }
        LOGRMB (RMB_LOG_DEBUG,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d]  Decode Wemq Header complete,total len %d, header len %d,header %s, body %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                pThreadCtx->m_stWeMQMSG.uiTotalLen,
                pThreadCtx->m_stWeMQMSG.uiHeaderLen,
                pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
                pThreadCtx->m_stWeMQMSG.cStrJsonBody);
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header ERROR",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
          return -1;
        }
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] epoll events %d",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                pThreadCtx->m_ptEvents[iTmp].events);
      }

    }
    if (!isRecvNewConnect)
    {
      pThreadCtx->m_iSockFd = pThreadCtx->m_iSockFdNew;
      pThreadCtx->ssl = pThreadCtx->sslNew;
    }
  }
  return iRecvd;
}