int wemq_sub_add_start_to_access()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_sub.c [1143:1254]


int wemq_sub_add_start_to_access (StRmbSub * pStRmbSub)
{
  if (pStRmbSub == NULL || pStRmbSub->pStContext == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "pStRmbSub or pStRmbSub->pStContext is null");
    return -1;
  }

  stContextProxy *pContextProxy = pStRmbSub->pStContext->pContextProxy;
  if (pContextProxy == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "pStRmbSub->pStContext->pContextProxy is null");
    return -1;
  }

  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_START;

  WEMQJSON *jsonHeader = json_object_new_object ();
  if (jsonHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed");
    return -2;
  }
  //add command
  json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
                          json_object_new_string (LISTEN_REQUEST));
  //add seq
  json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
                          json_object_new_int (0));
  //add code
  json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
                          json_object_new_int (0));

  const char *header_str = json_object_get_string (jsonHeader);
  if (header_str == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "header is null");
    json_object_put (jsonHeader);
    return -2;
  }

  stThreadMsg.m_iHeaderLen = strlen (header_str);
  LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%u, %s",
          stThreadMsg.m_iHeaderLen, header_str);
  stThreadMsg.m_pHeader =
    (char *) malloc ((stThreadMsg.m_iHeaderLen + 1) * sizeof (char));
  if (stThreadMsg.m_pHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "malloc for stThreadMsg.m_pHeader failed");
    json_object_put (jsonHeader);
    return -2;
  }
  memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
  stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';

  json_object_put (jsonHeader);

  stThreadMsg.m_iBodyLen = 0;
  stThreadMsg.m_pBody = NULL;

  int iRet = 0;
  struct timeval nowTimeVal;
  struct timespec timeout;
  pthread_mutex_lock (&pContextProxy->regMutex);
  iRet = wemq_kfifo_put (&pContextProxy->subFifo, stThreadMsg);
  if (iRet <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put for listen comman error");
    rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR;
    return -3;
  }

  gettimeofday (&nowTimeVal, NULL);
  timeout.tv_sec =
    nowTimeVal.tv_sec + (nowTimeVal.tv_usec / 1000 +
                         pRmbStConfig->iNormalTimeout) / 1000;
  timeout.tv_nsec =
    ((nowTimeVal.tv_usec / 1000 +
      pRmbStConfig->iNormalTimeout) % 1000) * 1000 * 1000;

  pContextProxy->iResultForReg = -1;
  pContextProxy->iFlagForReg = 0;
  if (pContextProxy->iFlagForReg == 0)
  {
    pthread_cond_timedwait (&pContextProxy->regCond, &pContextProxy->regMutex,
                            &timeout);
  }
  pthread_mutex_unlock (&pContextProxy->regMutex);

  switch (pContextProxy->iResultForReg)
  {
  case RMB_CODE_TIME_OUT:
    LOGRMB (RMB_LOG_ERROR, "send start command timeout");
    return -6;
  case RMB_CODE_SUSS:
    LOGRMB (RMB_LOG_INFO, "send start command to access succ");
    return 0;
  case RMB_CODE_OTHER_FAIL:
    LOGRMB (RMB_LOG_ERROR, "send start command failed,iRet=%d",
            pContextProxy->iResultForReg);
    return -4;
  case RMB_CODE_AUT_FAIL:
    LOGRMB (RMB_LOG_ERROR, "send start command authentication failed,iRet=%d",
            pContextProxy->iResultForReg);
    return -5;
  default:
    return 0;
  }

}