in eventmesh-sdks/eventmesh-sdk-c/src/rmb_sub.c [1143:1254]
int wemq_sub_add_start_to_access (StRmbSub * pStRmbSub)
{
if (pStRmbSub == NULL || pStRmbSub->pStContext == NULL)
{
LOGRMB (RMB_LOG_ERROR, "pStRmbSub or pStRmbSub->pStContext is null");
return -1;
}
stContextProxy *pContextProxy = pStRmbSub->pStContext->pContextProxy;
if (pContextProxy == NULL)
{
LOGRMB (RMB_LOG_ERROR, "pStRmbSub->pStContext->pContextProxy is null");
return -1;
}
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_START;
WEMQJSON *jsonHeader = json_object_new_object ();
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed");
return -2;
}
//add command
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (LISTEN_REQUEST));
//add seq
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (0));
//add code
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (0));
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "header is null");
json_object_put (jsonHeader);
return -2;
}
stThreadMsg.m_iHeaderLen = strlen (header_str);
LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%u, %s",
stThreadMsg.m_iHeaderLen, header_str);
stThreadMsg.m_pHeader =
(char *) malloc ((stThreadMsg.m_iHeaderLen + 1) * sizeof (char));
if (stThreadMsg.m_pHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR, "malloc for stThreadMsg.m_pHeader failed");
json_object_put (jsonHeader);
return -2;
}
memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';
json_object_put (jsonHeader);
stThreadMsg.m_iBodyLen = 0;
stThreadMsg.m_pBody = NULL;
int iRet = 0;
struct timeval nowTimeVal;
struct timespec timeout;
pthread_mutex_lock (&pContextProxy->regMutex);
iRet = wemq_kfifo_put (&pContextProxy->subFifo, stThreadMsg);
if (iRet <= 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put for listen comman error");
rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
return -3;
}
gettimeofday (&nowTimeVal, NULL);
timeout.tv_sec =
nowTimeVal.tv_sec + (nowTimeVal.tv_usec / 1000 +
pRmbStConfig->iNormalTimeout) / 1000;
timeout.tv_nsec =
((nowTimeVal.tv_usec / 1000 +
pRmbStConfig->iNormalTimeout) % 1000) * 1000 * 1000;
pContextProxy->iResultForReg = -1;
pContextProxy->iFlagForReg = 0;
if (pContextProxy->iFlagForReg == 0)
{
pthread_cond_timedwait (&pContextProxy->regCond, &pContextProxy->regMutex,
&timeout);
}
pthread_mutex_unlock (&pContextProxy->regMutex);
switch (pContextProxy->iResultForReg)
{
case RMB_CODE_TIME_OUT:
LOGRMB (RMB_LOG_ERROR, "send start command timeout");
return -6;
case RMB_CODE_SUSS:
LOGRMB (RMB_LOG_INFO, "send start command to access succ");
return 0;
case RMB_CODE_OTHER_FAIL:
LOGRMB (RMB_LOG_ERROR, "send start command failed,iRet=%d",
pContextProxy->iResultForReg);
return -4;
case RMB_CODE_AUT_FAIL:
LOGRMB (RMB_LOG_ERROR, "send start command authentication failed,iRet=%d",
pContextProxy->iResultForReg);
return -5;
default:
return 0;
}
}