in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [3318:3460]
static int32_t _wemq_thread_do_cmd_send_msg_reg (WemqThreadCtx * pThreadCtx)
{
StWemqThreadMsg *pStWemqThreadMsg = &pThreadCtx->m_stHelloWord;
int iRet = -1;
int iTotalLen =
pStWemqThreadMsg->m_iHeaderLen + pStWemqThreadMsg->m_iBodyLen + 8;
char *buf = pThreadCtx->m_pSendBuff;
ENCODE_INT (buf, iTotalLen);
ENCODE_INT (buf, pStWemqThreadMsg->m_iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, pStWemqThreadMsg->m_pHeader,
pStWemqThreadMsg->m_iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, pStWemqThreadMsg->m_pBody,
pStWemqThreadMsg->m_iBodyLen);
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] |wemq_thread2accesss| Send header:%s, body:%s, totalLen:%d, headerLen:%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pStWemqThreadMsg->m_pHeader, pStWemqThreadMsg->m_pBody, iTotalLen,
pStWemqThreadMsg->m_iHeaderLen);
iRet =
_wemq_thread_do_send_sync (pThreadCtx, pThreadCtx->m_pSendBuff, iTotalLen,
pStWemqThreadMsg->m_iHeaderLen);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_do_send_sync error!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
//return _wemq_thread_state_trans(pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_BREAK);
return -1;
}
iRet = _wemq_thread_do_recv_sync (pThreadCtx);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_do_recv_sync error: %d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, iRet);
return -2;
}
StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
WEMQJSON *jsonHeader = NULL;
WEMQJSON *jsonTmp = NULL;
int usCmd = -1;
int serRet = -1;
int seq = -1;
long time = 0;
char cMessage[100];
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] |access2wemq_thread|Recv: %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pWemqHeader->cStrJsonHeader);
jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
if (jsonHeader == NULL)
{
// 消息不完整, json解析失败
LOGRMB (RMB_LOG_ERROR, "[Type:%d] [TID:%lu] json_tokener_parse error: %s",
pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pWemqHeader->cStrJsonHeader) return 1;
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
if (jsonTmp != NULL)
{
usCmd = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
if (jsonTmp != NULL)
{
serRet = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
if (jsonTmp != NULL)
{
seq = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_TIME_LINT, &jsonTmp);
if (jsonTmp != NULL)
{
time = (long) json_object_get_int64 (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
if (jsonTmp != NULL)
{
memset (cMessage, 0x00, sizeof (cMessage));
strncpy (cMessage, json_object_get_string (jsonTmp),
sizeof (cMessage) - 1);
}
json_object_put (jsonHeader);
if (strcmp (cMessage, "auth exception") == 0)
{ // wemq user/passwd error
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Authentication error!user:%s, passwd:%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
pRmbStConfig->cWemqUser, pRmbStConfig->cWemqPasswd);
return -1;
}
if (serRet == RMB_CODE_OTHER_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy error:%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
serRet);
return -2;
}
if (serRet == RMB_CODE_AUT_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy Authentication error:%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
serRet);
return -3;
}
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy success!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
//hello world之后,心跳包可以延期
gettimeofday (&pThreadCtx->stTimeLast, NULL);
gettimeofday (&pThreadCtx->stTimeLastRecv, NULL);
return 0;
}