in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4538:4673]
int32_t wemq_thread_state_ok (WemqThreadCtx * pThreadCtx)
{
ASSERT (pThreadCtx);
ASSERT (pThreadCtx->m_iState == THREAD_STATE_OK);
int iRet = -1;
if ((pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
&& (pThreadCtx->m_ptProxyContext->iFlagForPublish == 0))
{
pThreadCtx->m_ptProxyContext->iFlagForPublish = 1;
}
iRet = _wemq_thread_do_last_failure_req (pThreadCtx);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] CALL DO LAST FAILURE REQ ERROR",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID);
return iRet;
}
//旧连接还在
if (pThreadCtx->m_iSockFdOld >= 0)
{
wemq_thread_do_deal_with_old_connect (pThreadCtx);
}
int iMsgNum = 0;
int iRecv = 0;
iMsgNum = _wemq_thread_get_data_from_fifo (pThreadCtx);
if (iMsgNum > 0)
{
iRet = _wemq_thread_do_req (pThreadCtx, &pThreadCtx->m_stWemqThreadMsg);
if (iRet == -2)
{
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
iRecv = _wemq_thread_do_recv_async (pThreadCtx, true);
if (iRecv > 0)
{
LOGRMB (RMB_LOG_DEBUG, "[%s] [Type:%d] [TID:%lu] RECV %d bytes",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRecv);
iRet = _wemq_thread_on_message (pThreadCtx, true);
if (iRet == WEMQ_MESSAGE_RET_GOODBYE)
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV byebye cmd",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
_wemq_thread_del_fd (pThreadCtx);
wemq_tcp_close (pThreadCtx->m_iSockFd, pThreadCtx->ssl);
pThreadCtx->m_iSockFd = -1;
pThreadCtx->ssl = NULL;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
else if (iRet == WEMQ_MESSAGE_RET_REDIRECT)
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] --> [redirect ip:%s|port:%d] RECV redirect cmd",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pThreadCtx->m_cRedirectIP, pThreadCtx->m_iRedirectPort);
//_wemq_thread_del_fd(pThreadCtx);
//close(pThreadCtx->m_iSockFd);
//pThreadCtx->m_iSockFd = -1;
//return _wemq_thread_state_trans(pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_BREAK);
wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_CLOSE); //停止发消息;
}
else if (iRet == WEMQ_MESSAGE_RET_SERVERGOODBYE) //access端主动离线
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV server goodbye cmd",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
//stContextProxy* pContextProxy = pThreadCtx->m_ptProxyContext;
wemq_proxy_goodbye (pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
//wemq_thread_rr_msg_is_empty(pThreadCtx);
// _wemq_thread_del_fd(pThreadCtx);
// close(pThreadCtx->m_iSockFd);
// pThreadCtx->m_iSockFd = -1;
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_CLOSE); //停止发消息;
}
else if (iRet == WEMQ_MESSAGE_RET_CLIENTGOODBYE) //client端主动离线,收到回包
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] RECV client goodbye cmd rsp",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
stContextProxy *pContextProxy = pThreadCtx->m_ptProxyContext;
pContextProxy->iFlagForGoodBye = 1;
pthread_cond_signal (&pContextProxy->goodByeCond);
//
//pContextProxy->iFlagForRun = 0; //停止运行状态机
return 0;
}
}
else
{
//LOGWEMQ(WEMQ_LOG_ERROR, "[%s],[TID:%d],ERROR RECV %d bytes\n", STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_iThreadId, iRecv);
if (iRecv == -1)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu]Thread on Message ERROR",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID);
}
if (iRecv == -2)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] connect closed by peer",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return 0;
}