int rmb_sub_add_listen_to_wemq()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_sub.c [969:1141]


int rmb_sub_add_listen_to_wemq (StRmbSub * pRmbSub,
                                const st_rmb_queue_info * pQueueInfo,
                                unsigned int uiQueueSize, const char *cDcn,
                                const char *cSysId)
{
  RMB_CHECK_POINT_NULL (pRmbSub, "pStRmbSub");
  RMB_CHECK_POINT_NULL (pRmbSub->pStContext, "pStRmbSub->pStContext");
  RMB_CHECK_POINT_NULL (pRmbSub->pStContext->pContextProxy,
                        "pStRmbSub->pStContext->pContextProxy");
  RMB_CHECK_POINT_NULL (cDcn, "cDcn");
  RMB_CHECK_POINT_NULL (cSysId, "cSysId");

  stContextProxy *pContextProxy = pRmbSub->pStContext->pContextProxy;

  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_ADD_LISTEN;
  WEMQJSON *jsonTopicList = json_object_new_array ();
  st_rmb_queue_info *p = pQueueInfo;
  int i;
  char cBroadcastDcn[10] = "000";
  for (i = 0; i < uiQueueSize; i++)
  {
    char cTopic[200] = { 0 };
    char serviceOrEvent = (*(p->cServiceId + 3) == '0') ? 's' : 'e';
    snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cDcn, serviceOrEvent,
              p->cServiceId, p->cScenarioId, *(p->cServiceId + 3));
    json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
    //自动监听广播topic
    if (serviceOrEvent == 'e')
    {
      memset (cTopic, 0x00, sizeof (cTopic));
      snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cBroadcastDcn,
                serviceOrEvent, p->cServiceId, p->cScenarioId,
                *(p->cServiceId + 3));
      json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
    }
    p += 1;
  }

  WEMQJSON *jsonHeader = json_object_new_object ();
  if (jsonHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonHeader failed");
    return -1;
  }

  json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
                          json_object_new_string (SUBSCRIBE_REQUEST));
  json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
                          json_object_new_int (0));
  json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
                          json_object_new_int (0));

  WEMQJSON *jsonBody = json_object_new_object ();
  if (jsonBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonBody failed");
    json_object_put (jsonHeader);
    json_object_put (jsonTopicList);
    return -1;
  }

  json_object_object_add (jsonBody, MSG_BODY_TOPIC_LIST_JSON, jsonTopicList);

  const char *header_str = json_object_get_string (jsonHeader);

  if (header_str == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_get_string for header is null");
    json_object_put (jsonBody);
    json_object_put (jsonHeader);
    return -2;
  }
  stThreadMsg.m_iHeaderLen = strlen (header_str);

  LOGRMB (RMB_LOG_INFO, "Gen thread msg header succ, len=%d, %s",
          stThreadMsg.m_iHeaderLen, header_str);
  stThreadMsg.m_pHeader =
    (char *) malloc (stThreadMsg.m_iHeaderLen * sizeof (char) + 1);
  if (stThreadMsg.m_pHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "malloc for m_pHeader failed, errno=%d", errno);
    json_object_put (jsonBody);
    json_object_put (jsonHeader);
    return -1;
  }
  strncpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
  stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';

  json_object_put (jsonHeader);

  const char *body_str = json_object_get_string (jsonBody);
  if (body_str == NULL)
  {
    json_object_put (jsonBody);
    return -1;
  }

  stThreadMsg.m_iBodyLen = strlen (body_str);
  stThreadMsg.m_pBody =
    (char *) malloc (stThreadMsg.m_iBodyLen * sizeof (char) + 1);
  if (stThreadMsg.m_pBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "malloc for hello body failed");
    json_object_put (jsonBody);
    return -1;
  }
  memcpy (stThreadMsg.m_pBody, body_str, stThreadMsg.m_iBodyLen);
  stThreadMsg.m_pBody[stThreadMsg.m_iBodyLen] = '\0';

  json_object_put (jsonBody);
  pthread_mutex_lock (&pContextProxy->regMutex);
  int iRet = wemq_kfifo_put (&pContextProxy->subFifo, stThreadMsg);
  if (iRet <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error,iRet=%d!\n", iRet);
    rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
    return -3;
  }

  struct timeval nowTimeVal;
  gettimeofday (&nowTimeVal, NULL);
  struct timespec timeout;
  timeout.tv_sec =
    nowTimeVal.tv_sec + (nowTimeVal.tv_usec / 1000 +
                         pRmbStConfig->iNormalTimeout) / 1000;
  timeout.tv_nsec =
    ((nowTimeVal.tv_usec / 1000 +
      pRmbStConfig->iNormalTimeout) % 1000) * 1000 * 1000;

  pContextProxy->iFlagForReg = 0;
  pContextProxy->iResultForReg = -1;
  if (pContextProxy->iFlagForReg == 0)
  {
    pthread_cond_timedwait (&pContextProxy->regCond, &pContextProxy->regMutex,
                            &timeout);
  }
  pthread_mutex_unlock (&pContextProxy->regMutex);

  if (pContextProxy->iFlagForReg == 1 && pContextProxy->iResultForReg != 0)
  {
    LOGRMB (RMB_LOG_ERROR, "add listen failed,iRet=%d,dcn=%s",
            pContextProxy->iResultForReg, cDcn);
    rmb_errno = RMB_ERROR_WORKER_REGISTER_ERROR;
    return rmb_errno;
  }

  if (pContextProxy->iFlagForReg != 1)
  {
    LOGRMB (RMB_LOG_ERROR, "add listen timeout!dcn=%s", cDcn);
    rmb_errno = RMB_ERROR_WORKER_REGISTER_ERROR;
    return rmb_errno;
  }

  LOGRMB (RMB_LOG_DEBUG, "add listen succ!dcn=%s", cDcn);
  //cache sub topic;
  p = pQueueInfo;
  for (i = 0; i < uiQueueSize; i++)
  {
    StWemqTopicProp stTopicProp;
    memset (&stTopicProp, 0, sizeof (stTopicProp));
    stTopicProp.flag = 0;
    strncpy (stTopicProp.cServiceId, p->cServiceId, strlen (p->cServiceId));
    stTopicProp.cServiceId[8] = '\0';
    strncpy (stTopicProp.cScenario, p->cScenarioId, strlen (p->cScenarioId));
    stTopicProp.cScenario[2] = '\0';
    wemq_topic_list_add_node (&pContextProxy->stTopicList, &stTopicProp);
    p += 1;
  }

  return 0;
}