static int32_t _wemq_thread_do_connect()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4217:4355]


static int32_t _wemq_thread_do_connect (WemqThreadCtx * pThreadCtx)
{
  //随机sleep配置时间+0~9ms
  struct timeval tv;
  gettimeofday (&tv, NULL);
  long now_time = tv.tv_sec * 1000000 + tv.tv_usec;
  srand ((unsigned int) now_time);
  int random_time = rand () % 10;
  int retry = pRmbStConfig->iWemqTcpConnectRetryNum;
  int sleep_time = pRmbStConfig->iWemqTcpConnectDelayTime * 1000;
  int timeout = pRmbStConfig->iWemqTcpConnectTimeout + random_time;

  //memset(pThreadCtx->m_cProxyIP, 0, sizeof(pThreadCtx->m_cProxyIP));
  pThreadCtx->m_cProxyIP[0] = '\0';
  pThreadCtx->m_uiProxyPort = 0;
  pThreadCtx->m_iLocalPort = 0;

  if (pThreadCtx->m_lRedirect == 0)
  {
    int iRet = 0;
    do
    {
      iRet =
        wemq_proxy_get_server (pThreadCtx->m_cProxyIP,
                               sizeof (pThreadCtx->m_cProxyIP),
                               &pThreadCtx->m_uiProxyPort);
      if (iRet == 2)
      {
        LOGRMB (RMB_LOG_ERROR, "get proxy ip/port failed");
        sleep (1);
      }
    }
    while (iRet == 2);
  }
  else
  {
    pThreadCtx->m_lRedirect = 0;
    snprintf (pThreadCtx->m_cProxyIP, sizeof (pThreadCtx->m_cProxyIP), "%s",
              pThreadCtx->m_cRedirectIP);
    pThreadCtx->m_uiProxyPort = pThreadCtx->m_iRedirectPort;
  }

  while (retry > 0)
  {
    if (0 != pRmbStConfig->tlsOnoff)
    {
      int err = do_ssl_connect (pThreadCtx);
      if (0 != err)
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [retry: %d|host:%s|port:%u] tls Connect Error!",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, retry, pThreadCtx->m_cProxyIP,
                pThreadCtx->m_uiProxyPort);
      }
      else
      {
        wemq_getsockename (pThreadCtx->m_iSockFd, NULL, 0,
                           &pThreadCtx->m_iLocalPort);
        if (_wemq_thread_set_fd_nonblock (pThreadCtx, pThreadCtx->m_iSockFd)
            != 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "wemq thread set pThreadCtx->m_iSockFd=%d to nonblock failed",
                  pThreadCtx->m_iSockFd);
          wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
          pThreadCtx->m_iSockFd = -1;
          pThreadCtx->ssl = NULL;
        }
        else
        {
          LOGRMB (RMB_LOG_INFO,
                  "[%s] [Type:%d] [TID:%lu] [host:%s|port:%u|local_port:%d] tls connect to proxy, fd=%d!",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
                  pThreadCtx->m_uiProxyPort, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_iSockFd);
          //add fd to epoll
          _wemq_thread_add_fd (pThreadCtx);
          return 0;
        }
      }
    }
    else
    {
      int iSockFd =
        wemq_tcp_connect (pThreadCtx->m_cProxyIP,
                          (uint16_t) pThreadCtx->m_uiProxyPort, timeout);
      if (iSockFd > 0)
      {
        pThreadCtx->m_iSockFd = iSockFd;
        wemq_getsockename (iSockFd, NULL, 0, &pThreadCtx->m_iLocalPort);
        if (_wemq_thread_set_fd_nonblock (pThreadCtx, pThreadCtx->m_iSockFd)
            != 0)
        {
          //exit(1);
          LOGRMB (RMB_LOG_ERROR,
                  "wemq thread set pThreadCtx->m_iSockFd=%d to nonblock failed",
                  pThreadCtx->m_iSockFd);
          wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
          pThreadCtx->m_iSockFd = -1;
          pThreadCtx->ssl = NULL;
          //return -1;
        }
        else
        {
          LOGRMB (RMB_LOG_INFO,
                  "[%s] [Type:%d] [TID:%lu] [host:%s|port:%u|local_port:%d] connect to proxy!",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
                  pThreadCtx->m_uiProxyPort, pThreadCtx->m_iLocalPort);
          //add fd to epoll
          _wemq_thread_add_fd (pThreadCtx);
          return 0;
        }
      }
    }

    LOGRMB (RMB_LOG_DEBUG,
            "[%s] [Type:%d] [TID:%lu] [retry: %d|host:%s|port:%u] Connect Error!",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, retry, pThreadCtx->m_cProxyIP,
            pThreadCtx->m_uiProxyPort);
    retry--;
    usleep (sleep_time);
    if (retry == 0)
    {
      wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [host:%s|port:%u] Connect Failed!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_cProxyIP,
              pThreadCtx->m_uiProxyPort);
      return -1;
    }

  }
  return -2;
}