in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1862:1990]
int rmb_pub_send_rr_msg_async_to_wemq (StRmbPub * pRmbPub,
StRmbMsg * pSendMsg,
unsigned int uiTimeOut)
{
RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub");
RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg");
if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL)
{
LOGRMB (RMB_LOG_ERROR, "async rr interface can't send event msg!");
rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG;
return -1;
}
//pub connect status error
//if (pRmbPub->pContext->pContextProxy->iFlagForPublish == 0) {
// LOGRMB(RMB_LOG_ERROR, "rmb pub not connect to access!!!");
// return -4;
//}
int iRet = 0;
// iRet = rmb_pub_set_destination_Interval(pRmbPub, pSendMsg);
// if (iRet != 0) {
// LOGRMB(RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d", pSendMsg->strServiceId, pSendMsg->strScenarioId, iRet);
// return -2;
// }
LOGRMB (RMB_LOG_DEBUG, "pubMsg dest=%d,%s,replyTo=%s",
pSendMsg->dest.iDestType, pSendMsg->dest.cDestName,
pSendMsg->replyTo.cDestName);
iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet);
return -3;
}
if (uiTimeOut > RR_ASYNC_MSG_MAX_LIVE_TIME)
{
LOGRMB (RMB_LOG_ERROR, "RR sync ttl too large, max value is:%ld",
RR_ASYNC_MSG_MAX_LIVE_TIME);
return -4;
}
if (pSendMsg->ulMsgLiveTime == 0
|| pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
{
pSendMsg->ulMsgLiveTime = uiTimeOut;
}
GetRmbNowLongTime ();
pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;
pSendMsg->replyTo.iDestType = RMB_DEST_TOPIC;
StContext *pStContext = pRmbPub->pContext;
pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;
stContextProxy *pContextProxy = pStContext->pContextProxy;
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST_ASYNC;
iRet =
rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg, 0);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
"wemq_pub_encode_thread_msg error", pSendMsg);
return iRet;
}
unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId);
int i = 0;
struct timeval tv_now;
gettimeofday (&tv_now, NULL);
unsigned long ulNowTime = tv_now.tv_sec * 1000 + tv_now.tv_usec / 1000;
int iFlagForList = 0;
for (i = 0;
i <
pContextProxy->pUniqueListForRRAsyncNew.
get_array_size (&pContextProxy->pUniqueListForRRAsyncNew); i++)
{
if (pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag == 0)
{
pthread_mutex_lock (&pContextProxy->rrMutex);
snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].unique_id,
sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
unique_id), "%s", pSendMsg->sysHeader.cUniqueId);
snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].biz_seq,
sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
biz_seq), "%s", pSendMsg->sysHeader.cBizSeqNo);
pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag = 1;
pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeStamp = ulNowTime;
pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeout = uiTimeOut;
iFlagForList = 1;
pthread_mutex_unlock (&pContextProxy->rrMutex);
break;
}
}
//已有空间已装满
if (iFlagForList == 0)
{
LOGRMB (RMB_LOG_INFO, "local list for rr async push back");
StUniqueIdList uniqueIdList;
strncpy (uniqueIdList.unique_id, pSendMsg->sysHeader.cUniqueId,
uiUniqueLen);
uniqueIdList.unique_id[uiUniqueLen] = '\0';
uniqueIdList.flag = 1;
uniqueIdList.timeStamp = ulNowTime;
uniqueIdList.timeout = uiTimeOut;
pthread_mutex_lock (&pContextProxy->rrMutex);
pContextProxy->pUniqueListForRRAsyncNew.Input (uniqueIdList,
&pContextProxy->
pUniqueListForRRAsyncNew);
pthread_mutex_unlock (&pContextProxy->rrMutex);
}
iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
if (iRet <= 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet);
rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
rmb_send_log_for_error (pStContext->pContextProxy,
RMB_ERROR_WORKER_PUT_FIFO_ERROR,
"wemq_kfifo_put error", pSendMsg);
return -4;
}
pContextProxy->iFlagForRRAsync = 1;
return 0;
}