static int32_t _wemq_thread_do_recv_sync()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [2257:2419]


static int32_t _wemq_thread_do_recv_sync (WemqThreadCtx * pThreadCtx)
{
  ASSERT (pThreadCtx);
  int timeout = pRmbStConfig->iWemqTcpSocketTimeout;

  unsigned int uiTimeOutTimes = 0;
  unsigned int uiClosedByPeerTimes = 0;
  while (1)
  {
    uint32_t iRecvLen;
    int iRet =
      wemq_tcp_recv (pThreadCtx->m_iSockFd, pThreadCtx->m_pRecvBuff,
                     &iRecvLen, timeout, pThreadCtx->ssl);
    if (iRet == 0)
    {
      uiTimeOutTimes += 1;
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP recv timeout!timeout_times=%u, fd=%d",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
              uiTimeOutTimes, pThreadCtx->m_iSockFd);
      if (uiTimeOutTimes >= 12)
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP continuity recv timeout times=%d, so close connect",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
                uiTimeOutTimes);
        uiTimeOutTimes = 0;
        _wemq_thread_del_fd (pThreadCtx);
        wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
        pThreadCtx->m_iSockFd = -1;
        pThreadCtx->ssl = NULL;
        wemq_proxy_to_black_list (pThreadCtx->m_cProxyIP,
                                  pThreadCtx->m_uiProxyPort);
        return -2;
      }
      continue;
    }
    else if (iRet == -2)
    {
      uiClosedByPeerTimes += 1;
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP conncet closed by peer(%d)",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, errno);
      if (uiClosedByPeerTimes >= 3)
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] TCP conncet continuity closed by peer times=%d, so close connect",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
                uiClosedByPeerTimes);
        uiClosedByPeerTimes = 0;
        _wemq_thread_del_fd (pThreadCtx);
        wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
        pThreadCtx->m_iSockFd = -1;
        pThreadCtx->ssl = NULL;
        wemq_proxy_to_black_list (pThreadCtx->m_cProxyIP,
                                  pThreadCtx->m_uiProxyPort);
        return -2;
      }
      continue;
    }
    else if (iRet == -3)
    {
      // msg is not full
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] msg is not full",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      _wemq_thread_del_fd (pThreadCtx);
      wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
      pThreadCtx->m_iSockFd = -1;
      pThreadCtx->ssl = NULL;

      return 1;
    }
    else if (iRet < 0)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] wemq_tcp_recv error",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      return -1;
    }

    LOGRMB (RMB_LOG_DEBUG,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv complete len %d",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, iRet);

    //memset(&pThreadCtx->m_stWeMQMSG, 0, sizeof(pThreadCtx->m_stWeMQMSG));
    memset (&pThreadCtx->m_stWeMQMSG, 0, (sizeof (int) * 2));
    DecodeWeMQMsg (&pThreadCtx->m_stWeMQMSG, pThreadCtx->m_pRecvBuff, iRet);
    if (pThreadCtx->m_stWeMQMSG.uiHeaderLen == 0
        || pThreadCtx->m_stWeMQMSG.uiHeaderLen >= MAX_WEMQ_HEADER_LEN)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv header len %u is 0 or too long",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
              pThreadCtx->m_stWeMQMSG.uiHeaderLen);
      return -1;
    }
    if (pThreadCtx->m_stWeMQMSG.uiHeaderLen > 0)
    {
      memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonHeader,
              pThreadCtx->m_pRecvBuff + 8,
              pThreadCtx->m_stWeMQMSG.uiHeaderLen);
      pThreadCtx->m_stWeMQMSG.cStrJsonHeader[pThreadCtx->m_stWeMQMSG.
                                             uiHeaderLen] = '\0';
    }
    unsigned int uiTmpBodyLen =
      pThreadCtx->m_stWeMQMSG.uiTotalLen -
      pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8;
    if (uiTmpBodyLen >= MAX_WEMQ_BODY_LEN)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv body len %d is too long",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
              uiTmpBodyLen);
      return -1;
    }
    if (uiTmpBodyLen == 0)
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] recv body len is 0",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
    }
    //if (pThreadCtx->m_stWeMQMSG.uiTotalLen - pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8 > 0)
    if (uiTmpBodyLen > 0)
    {
      memcpy (pThreadCtx->m_stWeMQMSG.cStrJsonBody,
              pThreadCtx->m_pRecvBuff + 8 +
              pThreadCtx->m_stWeMQMSG.uiHeaderLen,
              pThreadCtx->m_stWeMQMSG.uiTotalLen -
              pThreadCtx->m_stWeMQMSG.uiHeaderLen - 8);
      pThreadCtx->m_stWeMQMSG.cStrJsonBody[uiTmpBodyLen] = '\0';
    }
    LOGRMB (RMB_LOG_DEBUG,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d]  Decode Wemq Header complete,total len %d, header len %d,header %s",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_stWeMQMSG.uiTotalLen,
            pThreadCtx->m_stWeMQMSG.uiHeaderLen,
            pThreadCtx->m_stWeMQMSG.cStrJsonHeader);
    return 0;
  }
  return -1;
}