in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1610:1733]
int rmb_pub_send_msg_to_wemq (StRmbPub * pRmbPub, StRmbMsg * pMsg)
{
if (pRmbPub == NULL || pMsg == NULL)
{
LOGRMB (RMB_LOG_ERROR, "pRmbPub or pMsg is null");
return -1;
}
//check dest is set
if (rmb_check_msg_valid (pMsg) != 0)
{
rmb_errno = RMB_ERROR_MSG_MISSING_PART;
return -2;
}
if (pMsg->iEventOrService == (int) RMB_SERVICE_CALL)
{
LOGRMB (RMB_LOG_ERROR,
"rmb pub event interface can't send rr msg,serviceId=%s!\n",
pMsg->strServiceId);
rmb_errno = RMB_ERROR_EVENT_INTERFACE_CAN_NOT_SEND_RR_MSG;
return -3;
}
int iRet = 0;
iRet = rmb_msg_init (pMsg, pRmbStConfig, C_TYPE_WEMQ);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d\n", iRet);
return -5;
}
if (pMsg->ulMsgLiveTime == 0
|| pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
{
pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME;
}
pMsg->cLogicType = EVENT_PKG_IN_WEMQ;
GetRmbNowLongTime ();
pMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;
StContext *pStContext = pRmbPub->pContext;
pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;
if (pMsg->ulMsgLiveTime == 0
|| pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
{
pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME;
}
stContextProxy *pContextProxy = pStContext->pContextProxy;
StWemqThreadMsg stThreadMsg;
memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_MSG;
iRet =
rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pMsg,
pMsg->ulMsgLiveTime);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
"wemq_pub_encode_thread_msg error", pMsg);
return iRet;
}
pthread_mutex_lock (&pContextProxy->eventMutex);
iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
if (iRet <= 0)
{
LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!,iRet=%d", iRet);
rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
rmb_send_log_for_error (pStContext->pContextProxy,
RMB_ERROR_WORKER_PUT_FIFO_ERROR,
"wemq_kfifo_put error", pMsg);
return -5;
}
struct timeval tv;
gettimeofday (&tv, NULL);
struct timespec ts_timeout;
ts_timeout.tv_sec =
tv.tv_sec + (tv.tv_usec / 1000 + pRmbStConfig->accessAckTimeOut) / 1000;
ts_timeout.tv_nsec =
((tv.tv_usec / 1000 +
pRmbStConfig->accessAckTimeOut) % 1000) * 1000 * 1000;
pContextProxy->iFlagForEvent = -1;
if (pContextProxy->iFlagForEvent == -1)
{
//reset seq
pContextProxy->iSeqForEvent = g_iSendReqForEvent;
LOGRMB (RMB_LOG_DEBUG, "reset seq:%ld, pContextProxy->iSeqForEvent:%ld",
g_iSendReqForEvent, pContextProxy->iSeqForEvent);
pthread_cond_timedwait (&pContextProxy->eventCond,
&pContextProxy->eventMutex, &ts_timeout);
}
pthread_mutex_unlock (&pContextProxy->eventMutex);
switch (pContextProxy->iFlagForEvent)
{
case RMB_CODE_TIME_OUT:
LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pMsg));
rmb_errno = RMB_ERROR_SEND_EVENT_MSG_FAIL;
rmb_send_log_for_error (pStContext->pContextProxy,
RMB_ERROR_SEND_EVENT_MSG_FAIL,
"wemq send event msg ack timeout", pMsg);
return -6;
case RMB_CODE_SUSS:
LOGRMB (RMB_LOG_DEBUG, "send msg succ!req=%s\n", rmb_msg_print (pMsg));
return 0;
case RMB_CODE_OTHER_FAIL:
LOGRMB (RMB_LOG_ERROR, "send msg failed!req=%s", rmb_msg_print (pMsg));
return -6;
case RMB_CODE_AUT_FAIL:
LOGRMB (RMB_LOG_ERROR, "Authentication failed!req=%s",
rmb_msg_print (pMsg));
return -5;
}
}