int rmb_pub_send_msg_to_wemq()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1610:1733]


int rmb_pub_send_msg_to_wemq (StRmbPub * pRmbPub, StRmbMsg * pMsg)
{
  if (pRmbPub == NULL || pMsg == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "pRmbPub or pMsg is null");
    return -1;
  }

  //check dest is set
  if (rmb_check_msg_valid (pMsg) != 0)
  {
    rmb_errno = RMB_ERROR_MSG_MISSING_PART;
    return -2;
  }

  if (pMsg->iEventOrService == (int) RMB_SERVICE_CALL)
  {
    LOGRMB (RMB_LOG_ERROR,
            "rmb pub event interface can't send rr msg,serviceId=%s!\n",
            pMsg->strServiceId);
    rmb_errno = RMB_ERROR_EVENT_INTERFACE_CAN_NOT_SEND_RR_MSG;
    return -3;
  }

  int iRet = 0;

  iRet = rmb_msg_init (pMsg, pRmbStConfig, C_TYPE_WEMQ);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d\n", iRet);
    return -5;
  }

  if (pMsg->ulMsgLiveTime == 0
      || pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
  {
    pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME;
  }

  pMsg->cLogicType = EVENT_PKG_IN_WEMQ;
  GetRmbNowLongTime ();
  pMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;

  StContext *pStContext = pRmbPub->pContext;
  pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;

  if (pMsg->ulMsgLiveTime == 0
      || pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
  {
    pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME;
  }

  stContextProxy *pContextProxy = pStContext->pContextProxy;
  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_MSG;
  iRet =
    rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pMsg,
                               pMsg->ulMsgLiveTime);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
    rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
                            "wemq_pub_encode_thread_msg error", pMsg);
    return iRet;
  }

  pthread_mutex_lock (&pContextProxy->eventMutex);
  iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
  if (iRet <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!,iRet=%d", iRet);
    rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
    rmb_send_log_for_error (pStContext->pContextProxy,
                            RMB_ERROR_WORKER_PUT_FIFO_ERROR,
                            "wemq_kfifo_put error", pMsg);
    return -5;
  }

  struct timeval tv;
  gettimeofday (&tv, NULL);
  struct timespec ts_timeout;
  ts_timeout.tv_sec =
    tv.tv_sec + (tv.tv_usec / 1000 + pRmbStConfig->accessAckTimeOut) / 1000;
  ts_timeout.tv_nsec =
    ((tv.tv_usec / 1000 +
      pRmbStConfig->accessAckTimeOut) % 1000) * 1000 * 1000;

  pContextProxy->iFlagForEvent = -1;
  if (pContextProxy->iFlagForEvent == -1)
  {
    //reset seq
    pContextProxy->iSeqForEvent = g_iSendReqForEvent;
    LOGRMB (RMB_LOG_DEBUG, "reset seq:%ld, pContextProxy->iSeqForEvent:%ld",
            g_iSendReqForEvent, pContextProxy->iSeqForEvent);

    pthread_cond_timedwait (&pContextProxy->eventCond,
                            &pContextProxy->eventMutex, &ts_timeout);

  }
  pthread_mutex_unlock (&pContextProxy->eventMutex);

  switch (pContextProxy->iFlagForEvent)
  {
  case RMB_CODE_TIME_OUT:
    LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pMsg));
    rmb_errno = RMB_ERROR_SEND_EVENT_MSG_FAIL;
    rmb_send_log_for_error (pStContext->pContextProxy,
                            RMB_ERROR_SEND_EVENT_MSG_FAIL,
                            "wemq send event msg ack timeout", pMsg);
    return -6;
  case RMB_CODE_SUSS:
    LOGRMB (RMB_LOG_DEBUG, "send msg succ!req=%s\n", rmb_msg_print (pMsg));
    return 0;
  case RMB_CODE_OTHER_FAIL:
    LOGRMB (RMB_LOG_ERROR, "send msg failed!req=%s", rmb_msg_print (pMsg));
    return -6;
  case RMB_CODE_AUT_FAIL:
    LOGRMB (RMB_LOG_ERROR, "Authentication failed!req=%s",
            rmb_msg_print (pMsg));
    return -5;
  }

}