in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1735:1860]
static int rmb_pub_send_and_receive_to_wemq (StRmbPub * pRmbPub,
StRmbMsg * pSendMsg,
StRmbMsg * pRevMsg,
unsigned int uiTimeOut)
{
RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub");
RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg");
RMB_CHECK_POINT_NULL (pRevMsg, "pRevMsg");
if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL)
{
LOGRMB (RMB_LOG_ERROR, "rr interface can't send event msg!");
rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG;
return -1;
}
int iRet = 0;
iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet);
return -3;
}
if (pSendMsg->ulMsgLiveTime == 0
|| pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
{
pSendMsg->ulMsgLiveTime = uiTimeOut;
}
pSendMsg->cLogicType = REQ_PKG_IN_WEMQ;
GetRmbNowLongTime ();
pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;
StContext *pStContext = pRmbPub->pContext;
pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;
stContextProxy *pContextProxy = pStContext->pContextProxy;
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST;
iRet =
rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg,
uiTimeOut);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
"wemq_pub_encode_thread_msg error", pSendMsg);
return -4;
}
pthread_mutex_lock (&pContextProxy->rrMutex);
iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
if (iRet <= 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet);
rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
rmb_send_log_for_error (pStContext->pContextProxy,
RMB_ERROR_WORKER_PUT_FIFO_ERROR,
"wemq_kfifo_put error", pSendMsg);
return -5;
}
struct timeval tv;
gettimeofday (&tv, NULL);
struct timespec ts_timeout;
ts_timeout.tv_sec = tv.tv_sec + (tv.tv_usec / 1000 + uiTimeOut) / 1000;
ts_timeout.tv_nsec = ((tv.tv_usec / 1000 + uiTimeOut) % 1000) * 1000 * 1000;
int i = 0;
unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId);
pContextProxy->iFlagForRR = -1;
if (pContextProxy->iFlagForRR == -1)
{
//add uniqueId
strncpy (pContextProxy->stUnique.unique_id, pSendMsg->sysHeader.cUniqueId,
uiUniqueLen);
pContextProxy->stUnique.unique_id[uiUniqueLen] = '\0';
pContextProxy->stUnique.flag = 1;
}
pthread_cond_timedwait (&pContextProxy->rrCond, &pContextProxy->rrMutex,
&ts_timeout);
if (pContextProxy->iFlagForRR == RMB_CODE_TIME_OUT)
{
pContextProxy->stUnique.flag = 0;
}
pthread_mutex_unlock (&pContextProxy->rrMutex);
switch (pContextProxy->iFlagForRR)
{
case RMB_CODE_TIME_OUT:
LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pSendMsg));
rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
rmb_send_log_for_error (pStContext->pContextProxy,
RMB_ERROR_SEND_RR_MSG_TIMEOUT,
"wemq send rr msg timeout", pSendMsg);
return -6;
case RMB_CODE_SUSS:
trans_json_2_rmb_msg (pRevMsg, pContextProxy->mPubRRBuf,
RESPONSE_TO_CLIENT);
LOGRMB (RMB_LOG_DEBUG, "receive reply succ,buf:%s",
pContextProxy->mPubRRBuf);
pRevMsg->cPkgType = RR_TOPIC_PKG;
return 0;
case RMB_CODE_OTHER_FAIL:
LOGRMB (RMB_LOG_ERROR, "receive reply failed!req=%s",
rmb_msg_print (pSendMsg));
rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
return -4;
case RMB_CODE_AUT_FAIL:
LOGRMB (RMB_LOG_ERROR, "receive reply Authentication failed!req=%s",
rmb_msg_print (pSendMsg));
rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
return -5;
case RMB_CODE_DYED_MSG:
LOGRMB (RMB_LOG_INFO, "receive dyed msg:%s", pContextProxy->mPubRRBuf);
return 1;
}
}