in eventmesh-sdks/eventmesh-sdk-c/src/rmb_sub.c [969:1141]
int rmb_sub_add_listen_to_wemq (StRmbSub * pRmbSub,
const st_rmb_queue_info * pQueueInfo,
unsigned int uiQueueSize, const char *cDcn,
const char *cSysId)
{
RMB_CHECK_POINT_NULL (pRmbSub, "pStRmbSub");
RMB_CHECK_POINT_NULL (pRmbSub->pStContext, "pStRmbSub->pStContext");
RMB_CHECK_POINT_NULL (pRmbSub->pStContext->pContextProxy,
"pStRmbSub->pStContext->pContextProxy");
RMB_CHECK_POINT_NULL (cDcn, "cDcn");
RMB_CHECK_POINT_NULL (cSysId, "cSysId");
stContextProxy *pContextProxy = pRmbSub->pStContext->pContextProxy;
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_ADD_LISTEN;
WEMQJSON *jsonTopicList = json_object_new_array ();
st_rmb_queue_info *p = pQueueInfo;
int i;
char cBroadcastDcn[10] = "000";
for (i = 0; i < uiQueueSize; i++)
{
char cTopic[200] = { 0 };
char serviceOrEvent = (*(p->cServiceId + 3) == '0') ? 's' : 'e';
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cDcn, serviceOrEvent,
p->cServiceId, p->cScenarioId, *(p->cServiceId + 3));
json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
//自动监听广播topic
if (serviceOrEvent == 'e')
{
memset (cTopic, 0x00, sizeof (cTopic));
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cBroadcastDcn,
serviceOrEvent, p->cServiceId, p->cScenarioId,
*(p->cServiceId + 3));
json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
}
p += 1;
}
WEMQJSON *jsonHeader = json_object_new_object ();
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonHeader failed");
return -1;
}
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (SUBSCRIBE_REQUEST));
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (0));
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (0));
WEMQJSON *jsonBody = json_object_new_object ();
if (jsonBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonBody failed");
json_object_put (jsonHeader);
json_object_put (jsonTopicList);
return -1;
}
json_object_object_add (jsonBody, MSG_BODY_TOPIC_LIST_JSON, jsonTopicList);
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_get_string for header is null");
json_object_put (jsonBody);
json_object_put (jsonHeader);
return -2;
}
stThreadMsg.m_iHeaderLen = strlen (header_str);
LOGRMB (RMB_LOG_INFO, "Gen thread msg header succ, len=%d, %s",
stThreadMsg.m_iHeaderLen, header_str);
stThreadMsg.m_pHeader =
(char *) malloc (stThreadMsg.m_iHeaderLen * sizeof (char) + 1);
if (stThreadMsg.m_pHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for m_pHeader failed, errno=%d", errno);
json_object_put (jsonBody);
json_object_put (jsonHeader);
return -1;
}
strncpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';
json_object_put (jsonHeader);
const char *body_str = json_object_get_string (jsonBody);
if (body_str == NULL)
{
json_object_put (jsonBody);
return -1;
}
stThreadMsg.m_iBodyLen = strlen (body_str);
stThreadMsg.m_pBody =
(char *) malloc (stThreadMsg.m_iBodyLen * sizeof (char) + 1);
if (stThreadMsg.m_pBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for hello body failed");
json_object_put (jsonBody);
return -1;
}
memcpy (stThreadMsg.m_pBody, body_str, stThreadMsg.m_iBodyLen);
stThreadMsg.m_pBody[stThreadMsg.m_iBodyLen] = '\0';
json_object_put (jsonBody);
pthread_mutex_lock (&pContextProxy->regMutex);
int iRet = wemq_kfifo_put (&pContextProxy->subFifo, stThreadMsg);
if (iRet <= 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error,iRet=%d!\n", iRet);
rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
return -3;
}
struct timeval nowTimeVal;
gettimeofday (&nowTimeVal, NULL);
struct timespec timeout;
timeout.tv_sec =
nowTimeVal.tv_sec + (nowTimeVal.tv_usec / 1000 +
pRmbStConfig->iNormalTimeout) / 1000;
timeout.tv_nsec =
((nowTimeVal.tv_usec / 1000 +
pRmbStConfig->iNormalTimeout) % 1000) * 1000 * 1000;
pContextProxy->iFlagForReg = 0;
pContextProxy->iResultForReg = -1;
if (pContextProxy->iFlagForReg == 0)
{
pthread_cond_timedwait (&pContextProxy->regCond, &pContextProxy->regMutex,
&timeout);
}
pthread_mutex_unlock (&pContextProxy->regMutex);
if (pContextProxy->iFlagForReg == 1 && pContextProxy->iResultForReg != 0)
{
LOGRMB (RMB_LOG_ERROR, "add listen failed,iRet=%d,dcn=%s",
pContextProxy->iResultForReg, cDcn);
rmb_errno = RMB_ERROR_WORKER_REGISTER_ERROR;
return rmb_errno;
}
if (pContextProxy->iFlagForReg != 1)
{
LOGRMB (RMB_LOG_ERROR, "add listen timeout!dcn=%s", cDcn);
rmb_errno = RMB_ERROR_WORKER_REGISTER_ERROR;
return rmb_errno;
}
LOGRMB (RMB_LOG_DEBUG, "add listen succ!dcn=%s", cDcn);
//cache sub topic;
p = pQueueInfo;
for (i = 0; i < uiQueueSize; i++)
{
StWemqTopicProp stTopicProp;
memset (&stTopicProp, 0, sizeof (stTopicProp));
stTopicProp.flag = 0;
strncpy (stTopicProp.cServiceId, p->cServiceId, strlen (p->cServiceId));
stTopicProp.cServiceId[8] = '\0';
strncpy (stTopicProp.cScenario, p->cScenarioId, strlen (p->cScenarioId));
stTopicProp.cScenario[2] = '\0';
wemq_topic_list_add_node (&pContextProxy->stTopicList, &stTopicProp);
p += 1;
}
return 0;
}