int rmb_pub_send_rr_msg_async_to_wemq()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1862:1990]


int rmb_pub_send_rr_msg_async_to_wemq (StRmbPub * pRmbPub,
                                       StRmbMsg * pSendMsg,
                                       unsigned int uiTimeOut)
{
  RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub");
  RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg");

  if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL)
  {
    LOGRMB (RMB_LOG_ERROR, "async rr interface can't send event msg!");
    rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG;
    return -1;
  }

  //pub connect status error
  //if (pRmbPub->pContext->pContextProxy->iFlagForPublish == 0) {
  //      LOGRMB(RMB_LOG_ERROR, "rmb pub not connect to access!!!");
  //      return -4;
  //}

  int iRet = 0;
//      iRet = rmb_pub_set_destination_Interval(pRmbPub, pSendMsg);
//      if (iRet != 0) {
//              LOGRMB(RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d", pSendMsg->strServiceId, pSendMsg->strScenarioId, iRet);
//              return -2;
//      }
  LOGRMB (RMB_LOG_DEBUG, "pubMsg dest=%d,%s,replyTo=%s",
          pSendMsg->dest.iDestType, pSendMsg->dest.cDestName,
          pSendMsg->replyTo.cDestName);

  iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet);
    return -3;
  }

  if (uiTimeOut > RR_ASYNC_MSG_MAX_LIVE_TIME)
  {
    LOGRMB (RMB_LOG_ERROR, "RR sync ttl too large, max value is:%ld",
            RR_ASYNC_MSG_MAX_LIVE_TIME);
    return -4;
  }

  if (pSendMsg->ulMsgLiveTime == 0
      || pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
  {
    pSendMsg->ulMsgLiveTime = uiTimeOut;
  }

  GetRmbNowLongTime ();
  pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;
  pSendMsg->replyTo.iDestType = RMB_DEST_TOPIC;

  StContext *pStContext = pRmbPub->pContext;
  pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;
  stContextProxy *pContextProxy = pStContext->pContextProxy;
  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST_ASYNC;
  iRet =
    rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg, 0);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
    rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
                            "wemq_pub_encode_thread_msg error", pSendMsg);
    return iRet;
  }
  unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId);
  int i = 0;
  struct timeval tv_now;
  gettimeofday (&tv_now, NULL);
  unsigned long ulNowTime = tv_now.tv_sec * 1000 + tv_now.tv_usec / 1000;
  int iFlagForList = 0;
  for (i = 0;
       i <
       pContextProxy->pUniqueListForRRAsyncNew.
       get_array_size (&pContextProxy->pUniqueListForRRAsyncNew); i++)
  {

    if (pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag == 0)
    {
      pthread_mutex_lock (&pContextProxy->rrMutex);
      snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].unique_id,
                sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
                        unique_id), "%s", pSendMsg->sysHeader.cUniqueId);
      snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].biz_seq,
                sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
                        biz_seq), "%s", pSendMsg->sysHeader.cBizSeqNo);
      pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag = 1;
      pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeStamp = ulNowTime;
      pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeout = uiTimeOut;
      iFlagForList = 1;
      pthread_mutex_unlock (&pContextProxy->rrMutex);
      break;
    }
  }
  //已有空间已装满
  if (iFlagForList == 0)
  {
    LOGRMB (RMB_LOG_INFO, "local list for rr async push back");
    StUniqueIdList uniqueIdList;
    strncpy (uniqueIdList.unique_id, pSendMsg->sysHeader.cUniqueId,
             uiUniqueLen);
    uniqueIdList.unique_id[uiUniqueLen] = '\0';
    uniqueIdList.flag = 1;
    uniqueIdList.timeStamp = ulNowTime;
    uniqueIdList.timeout = uiTimeOut;
    pthread_mutex_lock (&pContextProxy->rrMutex);
    pContextProxy->pUniqueListForRRAsyncNew.Input (uniqueIdList,
                                                   &pContextProxy->
                                                   pUniqueListForRRAsyncNew);
    pthread_mutex_unlock (&pContextProxy->rrMutex);
  }
  iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
  if (iRet <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet);
    rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
    rmb_send_log_for_error (pStContext->pContextProxy,
                            RMB_ERROR_WORKER_PUT_FIFO_ERROR,
                            "wemq_kfifo_put error", pSendMsg);
    return -4;
  }

  pContextProxy->iFlagForRRAsync = 1;
  return 0;
}