static int32_t _wemq_thread_do_req()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [3852:4128]


static int32_t _wemq_thread_do_req (WemqThreadCtx * pThreadCtx,
                                    StWemqThreadMsg * pStWemqThreadMsg)
{
  int iRet = -1;
  switch (pStWemqThreadMsg->m_iCmd)
  {
  case THREAD_MSG_CMD_ADD_LISTEN:
    {
      iRet =
        _wemq_thread_do_cmd_add_listen_msg (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }
  case THREAD_MSG_CMD_START:
    {
      iRet = _wemq_thread_do_cmd_start_msg (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread START CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }

      return iRet;
    }
  case THREAD_MSG_CMD_SEND_MSG:
    {
      iRet = _wemq_thread_do_cmd_send_msg (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }

  case THREAD_MSG_CMD_SEND_LOG:
    {
      iRet = _wemq_thread_do_cmd_send_log (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }

  case THREAD_MSG_CMD_SEND_CLIENT_GOODBYE:
    {
      iRet =
        _wemq_thread_do_cmd_client_goodbye_msg (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }
  case THREAD_MSG_CMD_SEND_REQUEST:
    {

      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] Thread REQ CMD THREAD_MSG_CMD_SEND_REQUEST",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID);
      iRet = _wemq_thread_do_cmd_send_request (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }
  case THREAD_MSG_CMD_SEND_REQUEST_ASYNC:
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] Thread REQ CMD THREAD_MSG_CMD_SEND_REQUEST",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID);
      iRet =
        _wemq_thread_do_cmd_send_async_request (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }
  case THREAD_MSG_CMD_SEND_REPLY:
    {

      iRet = _wemq_thread_do_cmd_send_reply (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
      }
      return iRet;
    }
  case THREAD_MSG_CMD_SEND_MSG_ACK:
    {
      LOGRMB (RMB_LOG_DEBUG, "[%s] [Type:%d] [TID:%lu] Do App ACK",
              STATE_MAP[pThreadCtx->m_iState],
              pThreadCtx->m_contextType, pThreadCtx->m_threadID);

      /*
         iRet = _wemq_thread_do_cmd_send_msg_ack(pThreadCtx, pStWemqThreadMsg);
         if (iRet == 0)
         {
         _wemq_thread_clear_thread_msg(pStWemqThreadMsg);
         pThreadCtx->m_iWemqThreadMsgHandled = 1;
         }
       */
      iRet = _wemq_thread_do_cmd_send_msg_ack (pThreadCtx, pStWemqThreadMsg);
      if (iRet == 0)
      {
        _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
        pThreadCtx->m_iWemqThreadMsgHandled = 1;
      }
      else
      {
        LOGRMB (RMB_LOG_ERROR,
                "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
                STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                _wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
        if (iRet == -2)
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                           THREAD_STATE_BREAK);
        }
        else
        {
          _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
          pThreadCtx->m_iWemqThreadMsgHandled = 1;
          LOGRMB (RMB_LOG_ERROR,
                  "data processing ERR cause _wemq_thread_do_cmd_send_msg_ack failed.");
        }
      }
      return iRet;
    }
  default:
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] do req error no such type req cmd %d!",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pStWemqThreadMsg->m_iCmd);
    _wemq_thread_clear_thread_msg (pStWemqThreadMsg);
    pThreadCtx->m_iWemqThreadMsgHandled = 1;
    break;
  }
  return iRet;
}