int32_t wemq_thread_run()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [5343:5450]


int32_t wemq_thread_run (WemqThreadCtx * pThreadCtx)
{
//      ASSERT (pThreadCtx);
  if (pThreadCtx == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_thread_run:pThreadCtx is null");
    return -1;
  }
  int iRet = 0;
  int iRunFlag = 1;
  int countDown = 0;
  while (iRunFlag)
//      while (pThreadCtx->m_ptProxyContext->iFlagForRun)
  {
    _wemq_thread_send_heart_beat (pThreadCtx);
    switch (pThreadCtx->m_iState)
    {
    case THREAD_STATE_INIT:
      {
        _wemq_thread_check_init (pThreadCtx);
        iRet = wemq_thread_state_init (pThreadCtx);
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR, "wemq_thread_state_init failed,iRet=%d",
                  iRet);
          return -1;
        }
        break;
      }
    case THREAD_STATE_CONNECT:
      {
        _wemq_thread_check_connect (pThreadCtx);
        wemq_thread_state_connect (pThreadCtx);
        break;
      }
    case THREAD_STATE_REGI:
      {
        _wemq_thread_check_regi (pThreadCtx);
        wemq_thread_state_regi (pThreadCtx);
        break;
      }
    case THREAD_STATE_OK:
      {
        _wemq_thread_check_ok (pThreadCtx);
        wemq_thread_state_ok (pThreadCtx);
        break;
      }
    case THREAD_STATE_CLOSE:
      {
        _wemq_thread_check_close (pThreadCtx);
        wemq_thread_state_close (pThreadCtx);
        break;
      }

    case THREAD_STATE_BREAK:
      {
        _wemq_thread_check_break (pThreadCtx);
        wemq_thread_state_break (pThreadCtx);
        break;
      }
    case THREAD_STATE_RECONNECT:
      {
        _wemq_thread_check_reconnect (pThreadCtx);
        wemq_thread_state_reconnect (pThreadCtx);
        break;
      }
    case THREAD_STATE_DESTROY:
      {
        _wemq_thread_check_destory (pThreadCtx);
        wemq_thread_state_destory (pThreadCtx);
        break;
      }
    default:
      iRunFlag = 0;
      break;
    }
    wemq_thread_clear_timeout_rr_async_request (pThreadCtx);
    if (pThreadCtx->m_ptProxyContext->iFlagForRun == 0)
    {
      if (wemq_thread_fifo_msg_is_empty (pThreadCtx) == 0
          && wemq_rr_all_msg_is_empty (pThreadCtx->m_ptProxyContext) == 0)
      {
        iRunFlag = 0;
      }
      else
      {
        GetRmbNowLongTime ();
        unsigned long timeout = pRmbStConfig->ulExitTimeOut;    //default:30s
        //超过timeout直接退出
        if (pRmbStConfig->ulNowTtime >=
            pThreadCtx->m_ptProxyContext->ulGoodByeTime + timeout)
        {
          iRunFlag = 0;
        }
      }
    }
  }
  LOGRMB (RMB_LOG_INFO, "[%s] [Type:%d] [TID:%lu] THREAD EXIT!!!!",
          STATE_MAP[pThreadCtx->m_iState],
          pThreadCtx->m_contextType, pThreadCtx->m_threadID);

  _wemq_thread_del_fd (pThreadCtx);
  wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
  pThreadCtx->ssl = NULL;
  pThreadCtx->m_iSockFd = -1;

  return 0;
}