static int rmb_pub_send_and_receive_to_wemq()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1735:1860]


static int rmb_pub_send_and_receive_to_wemq (StRmbPub * pRmbPub,
                                             StRmbMsg * pSendMsg,
                                             StRmbMsg * pRevMsg,
                                             unsigned int uiTimeOut)
{
  RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub");
  RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg");
  RMB_CHECK_POINT_NULL (pRevMsg, "pRevMsg");

  if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL)
  {
    LOGRMB (RMB_LOG_ERROR, "rr interface can't send event msg!");
    rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG;
    return -1;
  }

  int iRet = 0;

  iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet);
    return -3;
  }

  if (pSendMsg->ulMsgLiveTime == 0
      || pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME)
  {
    pSendMsg->ulMsgLiveTime = uiTimeOut;
  }

  pSendMsg->cLogicType = REQ_PKG_IN_WEMQ;
  GetRmbNowLongTime ();
  pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime;

  StContext *pStContext = pRmbPub->pContext;
  pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG;
  stContextProxy *pContextProxy = pStContext->pContextProxy;
  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST;
  iRet =
    rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg,
                               uiTimeOut);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!");
    rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL,
                            "wemq_pub_encode_thread_msg error", pSendMsg);
    return -4;
  }

  pthread_mutex_lock (&pContextProxy->rrMutex);
  iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg);
  if (iRet <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet);
    rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
    rmb_send_log_for_error (pStContext->pContextProxy,
                            RMB_ERROR_WORKER_PUT_FIFO_ERROR,
                            "wemq_kfifo_put error", pSendMsg);
    return -5;
  }

  struct timeval tv;
  gettimeofday (&tv, NULL);
  struct timespec ts_timeout;
  ts_timeout.tv_sec = tv.tv_sec + (tv.tv_usec / 1000 + uiTimeOut) / 1000;
  ts_timeout.tv_nsec = ((tv.tv_usec / 1000 + uiTimeOut) % 1000) * 1000 * 1000;

  int i = 0;
  unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId);
  pContextProxy->iFlagForRR = -1;

  if (pContextProxy->iFlagForRR == -1)
  {
    //add uniqueId
    strncpy (pContextProxy->stUnique.unique_id, pSendMsg->sysHeader.cUniqueId,
             uiUniqueLen);
    pContextProxy->stUnique.unique_id[uiUniqueLen] = '\0';
    pContextProxy->stUnique.flag = 1;

  }

  pthread_cond_timedwait (&pContextProxy->rrCond, &pContextProxy->rrMutex,
                          &ts_timeout);

  if (pContextProxy->iFlagForRR == RMB_CODE_TIME_OUT)
  {
    pContextProxy->stUnique.flag = 0;

  }
  pthread_mutex_unlock (&pContextProxy->rrMutex);

  switch (pContextProxy->iFlagForRR)
  {
  case RMB_CODE_TIME_OUT:
    LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pSendMsg));
    rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
    rmb_send_log_for_error (pStContext->pContextProxy,
                            RMB_ERROR_SEND_RR_MSG_TIMEOUT,
                            "wemq send rr msg timeout", pSendMsg);
    return -6;
  case RMB_CODE_SUSS:
    trans_json_2_rmb_msg (pRevMsg, pContextProxy->mPubRRBuf,
                          RESPONSE_TO_CLIENT);
    LOGRMB (RMB_LOG_DEBUG, "receive reply succ,buf:%s",
            pContextProxy->mPubRRBuf);
    pRevMsg->cPkgType = RR_TOPIC_PKG;
    return 0;
  case RMB_CODE_OTHER_FAIL:
    LOGRMB (RMB_LOG_ERROR, "receive reply failed!req=%s",
            rmb_msg_print (pSendMsg));
    rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
    return -4;
  case RMB_CODE_AUT_FAIL:
    LOGRMB (RMB_LOG_ERROR, "receive reply Authentication failed!req=%s",
            rmb_msg_print (pSendMsg));
    rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT;
    return -5;
  case RMB_CODE_DYED_MSG:
    LOGRMB (RMB_LOG_INFO, "receive dyed msg:%s", pContextProxy->mPubRRBuf);
    return 1;
  }

}