int32_t wemq_thread_state_ok()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4538:4673]


int32_t wemq_thread_state_ok (WemqThreadCtx * pThreadCtx)
{
  ASSERT (pThreadCtx);
  ASSERT (pThreadCtx->m_iState == THREAD_STATE_OK);

  int iRet = -1;

  if ((pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
      && (pThreadCtx->m_ptProxyContext->iFlagForPublish == 0))
  {
    pThreadCtx->m_ptProxyContext->iFlagForPublish = 1;
  }

  iRet = _wemq_thread_do_last_failure_req (pThreadCtx);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu]  CALL DO LAST FAILURE REQ ERROR",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID);
    return iRet;
  }

  //旧连接还在
  if (pThreadCtx->m_iSockFdOld >= 0)
  {

    wemq_thread_do_deal_with_old_connect (pThreadCtx);
  }

  int iMsgNum = 0;
  int iRecv = 0;
  iMsgNum = _wemq_thread_get_data_from_fifo (pThreadCtx);
  if (iMsgNum > 0)
  {
    iRet = _wemq_thread_do_req (pThreadCtx, &pThreadCtx->m_stWemqThreadMsg);
    if (iRet == -2)
    {
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }

  }

  iRecv = _wemq_thread_do_recv_async (pThreadCtx, true);
  if (iRecv > 0)
  {
    LOGRMB (RMB_LOG_DEBUG, "[%s] [Type:%d] [TID:%lu] RECV %d bytes",
            STATE_MAP[pThreadCtx->m_iState],
            pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRecv);
    iRet = _wemq_thread_on_message (pThreadCtx, true);
    if (iRet == WEMQ_MESSAGE_RET_GOODBYE)
    {
      LOGRMB (RMB_LOG_INFO,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV byebye cmd",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      _wemq_thread_del_fd (pThreadCtx);
      wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
      pThreadCtx->m_iSockFd = -1;
      pThreadCtx->ssl = NULL;
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }
    else if (iRet == WEMQ_MESSAGE_RET_REDIRECT)
    {
      LOGRMB (RMB_LOG_INFO,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] --> [redirect ip:%s|port:%d] RECV redirect cmd",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
              pThreadCtx->m_cRedirectIP, pThreadCtx->m_iRedirectPort);
      //_wemq_thread_del_fd(pThreadCtx);
      //close(pThreadCtx->m_iSockFd);
      //pThreadCtx->m_iSockFd = -1;
      //return _wemq_thread_state_trans(pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_BREAK);
      wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_CLOSE);   //停止发消息;
    }
    else if (iRet == WEMQ_MESSAGE_RET_SERVERGOODBYE)    //access端主动离线
    {
      LOGRMB (RMB_LOG_INFO,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV server goodbye cmd",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      //stContextProxy* pContextProxy = pThreadCtx->m_ptProxyContext;

      wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      //wemq_thread_rr_msg_is_empty(pThreadCtx);
      // _wemq_thread_del_fd(pThreadCtx);
      // close(pThreadCtx->m_iSockFd);
      // pThreadCtx->m_iSockFd = -1;

      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_CLOSE);   //停止发消息;
    }
    else if (iRet == WEMQ_MESSAGE_RET_CLIENTGOODBYE)    //client端主动离线,收到回包
    {
      LOGRMB (RMB_LOG_INFO,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV client goodbye cmd rsp",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      stContextProxy *pContextProxy = pThreadCtx->m_ptProxyContext;
      pContextProxy->iFlagForGoodBye = 1;
      pthread_cond_signal (&pContextProxy->goodByeCond);
      //
      //pContextProxy->iFlagForRun = 0;  //停止运行状态机            
      return 0;
    }
  }
  else
  {
    //LOGWEMQ(WEMQ_LOG_ERROR, "[%s],[TID:%d],ERROR RECV %d bytes\n", STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_iThreadId, iRecv); 
    if (iRecv == -1)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu]Thread on Message ERROR",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID);
    }
    if (iRecv == -2)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] connect closed by peer",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }
  }
  return 0;
}