in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4808:5197]
int32_t wemq_thread_state_reconnect (WemqThreadCtx * pThreadCtx)
{
ASSERT (pThreadCtx);
//TODO RECONNECT;
//sleep(1);
struct timeval tv;
gettimeofday (&tv, NULL);
long now_time = tv.tv_sec * 1000000 + tv.tv_usec;
srand ((unsigned int) now_time);
//随机sleep 30~50ms ,usleep 单位是纳秒
int sleep_time = rand () % 20 + 30;
usleep (sleep_time * 1000);
// hello msg
{
int iRet = -1;
iRet = _wemq_thread_do_cmd_send_msg_reg (pThreadCtx);
if (iRet > 0)
{
return -1;
}
else if (iRet < 0)
{
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
/**
* 程序首次起来时,如果选择的第一个ip连接失败而第二个ip连接成功且hello world指令发送成功,则应该通知前端连接成功
*/
if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
{
if (pThreadCtx->m_ptProxyContext->iFlagForPublish == 0)
{
pThreadCtx->m_ptProxyContext->iFlagForPublish = 1;
}
pthread_mutex_lock (&pThreadCtx->m_ptProxyContext->pubMutex);
if (pThreadCtx->m_ptProxyContext->iFlagForPub == 0)
{
pThreadCtx->m_ptProxyContext->iFlagForPub = 1;
pthread_cond_signal (&pThreadCtx->m_ptProxyContext->pubCond);
}
pthread_mutex_unlock (&pThreadCtx->m_ptProxyContext->pubMutex);
}
else if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_SUB)
{
pthread_mutex_lock (&pThreadCtx->m_ptProxyContext->subMutex);
if (pThreadCtx->m_ptProxyContext->iFlagForSub == 0)
{
pThreadCtx->m_ptProxyContext->iFlagForSub = 1;
pthread_cond_signal (&pThreadCtx->m_ptProxyContext->subCond);
}
pthread_mutex_unlock (&pThreadCtx->m_ptProxyContext->subMutex);
}
int flag = 0;
//regist topic list
StWemqTopicProp *ptTopicProp = NULL;
if (pThreadCtx->m_ptTopicList != NULL)
{
ptTopicProp = pThreadCtx->m_ptTopicList->next;
}
WEMQJSON *jsonTopicList = json_object_new_array ();
char cBroadcastDcn[10] = "000";
while (ptTopicProp != NULL)
{
flag = 1;
char cTopic[200];
//char serviceOrEvent = (*(ptTopicProp->cServiceId + 3) == '0') ? 'e' : 's';
char serviceOrEvent = (*(ptTopicProp->cServiceId + 3) == '0') ? 's' : 'e';
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c",
pRmbStConfig->cConsumerDcn, serviceOrEvent,
ptTopicProp->cServiceId, ptTopicProp->cScenario,
*(ptTopicProp->cServiceId + 3));
json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
//自动监听广播topic
if (serviceOrEvent == 'e')
{
memset (cTopic, 0x00, sizeof (cTopic));
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cBroadcastDcn,
serviceOrEvent, ptTopicProp->cServiceId,
ptTopicProp->cScenario, *(ptTopicProp->cServiceId + 3));
json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
}
ptTopicProp = ptTopicProp->next;
}
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_ADD_LISTEN;
WEMQJSON *jsonHeader = json_object_new_object ();
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonHeader failed");
return -1;
}
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (SUBSCRIBE_REQUEST));
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (0));
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (0));
WEMQJSON *jsonBody = json_object_new_object ();
if (jsonBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonBody failed");
json_object_put (jsonHeader);
json_object_put (jsonTopicList);
return -1;
}
json_object_object_add (jsonBody, MSG_BODY_TOPIC_LIST_JSON, jsonTopicList);
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_get_string for header is null");
json_object_put (jsonBody);
json_object_put (jsonHeader);
return -2;
}
stThreadMsg.m_iHeaderLen = strlen (header_str);
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] Gen thread msg header succ, len %d, %s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, stThreadMsg.m_iHeaderLen, header_str);
stThreadMsg.m_pHeader =
(char *) malloc (stThreadMsg.m_iHeaderLen * sizeof (char) + 1);
if (stThreadMsg.m_pHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for header failed");
json_object_put (jsonBody);
json_object_put (jsonHeader);
return -1;
}
memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';
json_object_put (jsonHeader);
const char *body_str = json_object_get_string (jsonBody);
if (body_str == NULL)
{
json_object_put (jsonBody);
return -1;
}
stThreadMsg.m_iBodyLen = strlen (body_str);
stThreadMsg.m_pBody =
(char *) malloc (stThreadMsg.m_iBodyLen * sizeof (char) + 1);
if (stThreadMsg.m_pBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for hello body failed");
json_object_put (jsonBody);
return -1;
}
memcpy (stThreadMsg.m_pBody, body_str, stThreadMsg.m_iBodyLen);
stThreadMsg.m_pBody[stThreadMsg.m_iBodyLen] = '\0';
json_object_put (jsonBody);
int iRet = _wemq_thread_do_cmd_add_listen_msg (pThreadCtx, &stThreadMsg);
if (iRet == -2)
{
free (stThreadMsg.m_pHeader);
free (stThreadMsg.m_pBody);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
iRet = _wemq_thread_do_recv_sync (pThreadCtx);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header ERROR",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
jsonHeader = NULL;
WEMQJSON *jsonTmp = NULL;
char *usCmd;
char *msg;
int serRet = -1;
int seq = -1;
long time = 0;
jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] [LocalPort:%d] json_tokener_parse error: %s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort,
pWemqHeader->
cStrJsonHeader) return _wemq_thread_state_trans (pThreadCtx,
pThreadCtx->
m_iState,
THREAD_STATE_BREAK);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
if (jsonTmp != NULL)
{
usCmd = json_object_get_string (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
if (jsonTmp != NULL)
{
serRet = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
if (jsonTmp != NULL)
{
seq = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
if (jsonTmp != NULL)
{
msg = json_object_get_string (jsonTmp);
}
json_object_put (jsonHeader);
if ((serRet == 0) && strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0)
{
LOGRMB (RMB_LOG_INFO,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s] register proxy success",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
usCmd);
}
else
{
if (strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s] [ret:%d] register proxy failed, iRet=%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
usCmd, serRet);
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s]register proxy failed, unknown cmd",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
usCmd);
}
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
//send start command
if (flag == 1)
{
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_START;
WEMQJSON *jsonHeader = json_object_new_object ();
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed");
return -2;
}
//add command
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (LISTEN_REQUEST));
//add seq
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (0));
//add code
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (0));
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "header is null");
json_object_put (jsonHeader);
return -2;
}
stThreadMsg.m_iHeaderLen = strlen (header_str);
LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%u, %s",
stThreadMsg.m_iHeaderLen, header_str) stThreadMsg.m_pHeader =
(char *) malloc ((stThreadMsg.m_iHeaderLen + 1) * sizeof (char));
if (stThreadMsg.m_pHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for stThreadMsg.m_pHeader failed");
json_object_put (jsonHeader);
return -2;
}
memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';
json_object_put (jsonHeader);
stThreadMsg.m_iBodyLen = 0;
stThreadMsg.m_pBody = NULL;
int iRet = _wemq_thread_do_cmd_start_msg (pThreadCtx, &stThreadMsg);
if (iRet == -2)
{
_wemq_thread_clear_thread_msg (&stThreadMsg);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
iRet = _wemq_thread_do_recv_sync (pThreadCtx);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] Decode Wemq Header ERROR",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
jsonHeader = NULL;
WEMQJSON *jsonTmp = NULL;
char *usCmd;
char *msg;
int serRet = -1;
int seq = -1;
long time = 0;
jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] [LocalPort:%d] json_tokener_parse error:%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pThreadCtx->m_iLocalPort, pWemqHeader->cStrJsonHeader);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
if (jsonTmp != NULL)
{
usCmd = json_object_get_string (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
if (jsonTmp != NULL)
{
serRet = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
if (jsonTmp != NULL)
{
seq = json_object_get_int (jsonTmp);
}
json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
if (jsonTmp != NULL)
{
msg = json_object_get_string (jsonTmp);
}
json_object_put (jsonHeader);
if (serRet != 0 || (strcmp (usCmd, LISTEN_RESPONSE) != 0))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [Seq:%d] [msg:%s] [CMD:%d] reconnect send start listen error:%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, seq, msg,
usCmd, serRet);
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_BREAK);
}
}
return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
THREAD_STATE_OK);
}