in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [5343:5450]
int32_t wemq_thread_run (WemqThreadCtx * pThreadCtx)
{
// ASSERT (pThreadCtx);
if (pThreadCtx == NULL)
{
LOGRMB (RMB_LOG_ERROR, "wemq_thread_run:pThreadCtx is null");
return -1;
}
int iRet = 0;
int iRunFlag = 1;
int countDown = 0;
while (iRunFlag)
// while (pThreadCtx->m_ptProxyContext->iFlagForRun)
{
_wemq_thread_send_heart_beat (pThreadCtx);
switch (pThreadCtx->m_iState)
{
case THREAD_STATE_INIT:
{
_wemq_thread_check_init (pThreadCtx);
iRet = wemq_thread_state_init (pThreadCtx);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_thread_state_init failed,iRet=%d",
iRet);
return -1;
}
break;
}
case THREAD_STATE_CONNECT:
{
_wemq_thread_check_connect (pThreadCtx);
wemq_thread_state_connect (pThreadCtx);
break;
}
case THREAD_STATE_REGI:
{
_wemq_thread_check_regi (pThreadCtx);
wemq_thread_state_regi (pThreadCtx);
break;
}
case THREAD_STATE_OK:
{
_wemq_thread_check_ok (pThreadCtx);
wemq_thread_state_ok (pThreadCtx);
break;
}
case THREAD_STATE_CLOSE:
{
_wemq_thread_check_close (pThreadCtx);
wemq_thread_state_close (pThreadCtx);
break;
}
case THREAD_STATE_BREAK:
{
_wemq_thread_check_break (pThreadCtx);
wemq_thread_state_break (pThreadCtx);
break;
}
case THREAD_STATE_RECONNECT:
{
_wemq_thread_check_reconnect (pThreadCtx);
wemq_thread_state_reconnect (pThreadCtx);
break;
}
case THREAD_STATE_DESTROY:
{
_wemq_thread_check_destory (pThreadCtx);
wemq_thread_state_destory (pThreadCtx);
break;
}
default:
iRunFlag = 0;
break;
}
wemq_thread_clear_timeout_rr_async_request (pThreadCtx);
if (pThreadCtx->m_ptProxyContext->iFlagForRun == 0)
{
if (wemq_thread_fifo_msg_is_empty (pThreadCtx) == 0
&& wemq_rr_all_msg_is_empty (pThreadCtx->m_ptProxyContext) == 0)
{
iRunFlag = 0;
}
else
{
GetRmbNowLongTime ();
unsigned long timeout = pRmbStConfig->ulExitTimeOut; //default:30s
//超过timeout直接退出
if (pRmbStConfig->ulNowTtime >=
pThreadCtx->m_ptProxyContext->ulGoodByeTime + timeout)
{
iRunFlag = 0;
}
}
}
}
LOGRMB (RMB_LOG_INFO, "[%s] [Type:%d] [TID:%lu] THREAD EXIT!!!!",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
_wemq_thread_del_fd (pThreadCtx);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->ssl = NULL;
pThreadCtx->m_iSockFd = -1;
return 0;
}