in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [3852:4128]
static int32_t _wemq_thread_do_req (WemqThreadCtx * pThreadCtx,
StWemqThreadMsg * pStWemqThreadMsg)
{
int iRet = -1;
switch (pStWemqThreadMsg->m_iCmd)
{
case THREAD_MSG_CMD_ADD_LISTEN:
{
iRet =
_wemq_thread_do_cmd_add_listen_msg (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_START:
{
iRet = _wemq_thread_do_cmd_start_msg (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread START CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_MSG:
{
iRet = _wemq_thread_do_cmd_send_msg (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_LOG:
{
iRet = _wemq_thread_do_cmd_send_log (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_CLIENT_GOODBYE:
{
iRet =
_wemq_thread_do_cmd_client_goodbye_msg (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_REQUEST:
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] Thread REQ CMD THREAD_MSG_CMD_SEND_REQUEST",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID);
iRet = _wemq_thread_do_cmd_send_request (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_REQUEST_ASYNC:
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] Thread REQ CMD THREAD_MSG_CMD_SEND_REQUEST",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID);
iRet =
_wemq_thread_do_cmd_send_async_request (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_REPLY:
{
iRet = _wemq_thread_do_cmd_send_reply (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return iRet;
}
case THREAD_MSG_CMD_SEND_MSG_ACK:
{
LOGRMB (RMB_LOG_DEBUG, "[%s] [Type:%d] [TID:%lu] Do App ACK",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
/*
iRet = _wemq_thread_do_cmd_send_msg_ack(pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg(pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
*/
iRet = _wemq_thread_do_cmd_send_msg_ack (pThreadCtx, pStWemqThreadMsg);
if (iRet == 0)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Thread REQ CMD ERROR %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
_wemq_thread_get_cmd (pStWemqThreadMsg->m_iCmd));
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
else
{
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
LOGRMB (RMB_LOG_ERROR,
"data processing ERR cause _wemq_thread_do_cmd_send_msg_ack failed.");
}
}
return iRet;
}
default:
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] do req error no such type req cmd %d!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pStWemqThreadMsg->m_iCmd);
_wemq_thread_clear_thread_msg (pStWemqThreadMsg);
pThreadCtx->m_iWemqThreadMsgHandled = 1;
break;
}
return iRet;
}