int32_t wemq_thread_state_reconnect()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [4808:5197]


int32_t wemq_thread_state_reconnect (WemqThreadCtx * pThreadCtx)
{
  ASSERT (pThreadCtx);
  //TODO RECONNECT;
  //sleep(1);
  struct timeval tv;
  gettimeofday (&tv, NULL);
  long now_time = tv.tv_sec * 1000000 + tv.tv_usec;
  srand ((unsigned int) now_time);
  //随机sleep 30~50ms ,usleep 单位是纳秒
  int sleep_time = rand () % 20 + 30;
  usleep (sleep_time * 1000);
  // hello msg
  {
    int iRet = -1;

    iRet = _wemq_thread_do_cmd_send_msg_reg (pThreadCtx);
    if (iRet > 0)
    {
      return -1;
    }
    else if (iRet < 0)
    {
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }
  }

    /**
     * 程序首次起来时,如果选择的第一个ip连接失败而第二个ip连接成功且hello world指令发送成功,则应该通知前端连接成功
     */
  if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
  {
    if (pThreadCtx->m_ptProxyContext->iFlagForPublish == 0)
    {
      pThreadCtx->m_ptProxyContext->iFlagForPublish = 1;
    }
    pthread_mutex_lock (&pThreadCtx->m_ptProxyContext->pubMutex);
    if (pThreadCtx->m_ptProxyContext->iFlagForPub == 0)
    {
      pThreadCtx->m_ptProxyContext->iFlagForPub = 1;
      pthread_cond_signal (&pThreadCtx->m_ptProxyContext->pubCond);
    }
    pthread_mutex_unlock (&pThreadCtx->m_ptProxyContext->pubMutex);
  }
  else if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_SUB)
  {
    pthread_mutex_lock (&pThreadCtx->m_ptProxyContext->subMutex);
    if (pThreadCtx->m_ptProxyContext->iFlagForSub == 0)
    {
      pThreadCtx->m_ptProxyContext->iFlagForSub = 1;
      pthread_cond_signal (&pThreadCtx->m_ptProxyContext->subCond);
    }
    pthread_mutex_unlock (&pThreadCtx->m_ptProxyContext->subMutex);
  }

  int flag = 0;
  //regist topic list
  StWemqTopicProp *ptTopicProp = NULL;
  if (pThreadCtx->m_ptTopicList != NULL)
  {
    ptTopicProp = pThreadCtx->m_ptTopicList->next;
  }
  WEMQJSON *jsonTopicList = json_object_new_array ();
  char cBroadcastDcn[10] = "000";
  while (ptTopicProp != NULL)
  {
    flag = 1;
    char cTopic[200];
    //char serviceOrEvent = (*(ptTopicProp->cServiceId + 3) == '0') ? 'e' : 's';
    char serviceOrEvent = (*(ptTopicProp->cServiceId + 3) == '0') ? 's' : 'e';
    snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c",
              pRmbStConfig->cConsumerDcn, serviceOrEvent,
              ptTopicProp->cServiceId, ptTopicProp->cScenario,
              *(ptTopicProp->cServiceId + 3));
    json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
    //自动监听广播topic
    if (serviceOrEvent == 'e')
    {
      memset (cTopic, 0x00, sizeof (cTopic));
      snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", cBroadcastDcn,
                serviceOrEvent, ptTopicProp->cServiceId,
                ptTopicProp->cScenario, *(ptTopicProp->cServiceId + 3));
      json_object_array_add (jsonTopicList, json_object_new_string (cTopic));
    }
    ptTopicProp = ptTopicProp->next;
  }
  StWemqThreadMsg stThreadMsg;
  memset (&stThreadMsg, 0, sizeof (StWemqThreadMsg));
  stThreadMsg.m_iCmd = THREAD_MSG_CMD_ADD_LISTEN;

  WEMQJSON *jsonHeader = json_object_new_object ();
  if (jsonHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonHeader failed");
    return -1;
  }

  json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
                          json_object_new_string (SUBSCRIBE_REQUEST));
  json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
                          json_object_new_int (0));
  json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
                          json_object_new_int (0));

  WEMQJSON *jsonBody = json_object_new_object ();
  if (jsonBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object for jsonBody failed");
    json_object_put (jsonHeader);
    json_object_put (jsonTopicList);
    return -1;
  }

  json_object_object_add (jsonBody, MSG_BODY_TOPIC_LIST_JSON, jsonTopicList);

  const char *header_str = json_object_get_string (jsonHeader);
  if (header_str == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_get_string for header is null");
    json_object_put (jsonBody);
    json_object_put (jsonHeader);
    return -2;
  }
  stThreadMsg.m_iHeaderLen = strlen (header_str);

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] Gen thread msg header succ, len %d, %s",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, stThreadMsg.m_iHeaderLen, header_str);
  stThreadMsg.m_pHeader =
    (char *) malloc (stThreadMsg.m_iHeaderLen * sizeof (char) + 1);
  if (stThreadMsg.m_pHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "malloc for header failed");
    json_object_put (jsonBody);
    json_object_put (jsonHeader);
    return -1;
  }
  memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
  stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';
  json_object_put (jsonHeader);

  const char *body_str = json_object_get_string (jsonBody);
  if (body_str == NULL)
  {
    json_object_put (jsonBody);
    return -1;
  }

  stThreadMsg.m_iBodyLen = strlen (body_str);
  stThreadMsg.m_pBody =
    (char *) malloc (stThreadMsg.m_iBodyLen * sizeof (char) + 1);
  if (stThreadMsg.m_pBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "malloc for hello body failed");
    json_object_put (jsonBody);
    return -1;
  }
  memcpy (stThreadMsg.m_pBody, body_str, stThreadMsg.m_iBodyLen);
  stThreadMsg.m_pBody[stThreadMsg.m_iBodyLen] = '\0';

  json_object_put (jsonBody);

  int iRet = _wemq_thread_do_cmd_add_listen_msg (pThreadCtx, &stThreadMsg);
  if (iRet == -2)
  {
    free (stThreadMsg.m_pHeader);
    free (stThreadMsg.m_pBody);
    return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                     THREAD_STATE_BREAK);
  }
  iRet = _wemq_thread_do_recv_sync (pThreadCtx);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Decode Wemq Header ERROR",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
    return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                     THREAD_STATE_BREAK);
  }

  StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
  jsonHeader = NULL;
  WEMQJSON *jsonTmp = NULL;
  char *usCmd;
  char *msg;
  int serRet = -1;
  int seq = -1;
  long time = 0;

  jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
  if (jsonHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[Type:%d] [TID:%lu] [LocalPort:%d] json_tokener_parse error: %s",
            pThreadCtx->m_contextType, pThreadCtx->m_threadID,
            pThreadCtx->m_iLocalPort,
            pWemqHeader->
            cStrJsonHeader) return _wemq_thread_state_trans (pThreadCtx,
                                                             pThreadCtx->
                                                             m_iState,
                                                             THREAD_STATE_BREAK);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    usCmd = json_object_get_string (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    serRet = json_object_get_int (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    seq = json_object_get_int (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    msg = json_object_get_string (jsonTmp);
  }

  json_object_put (jsonHeader);

  if ((serRet == 0) && strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0)
  {
    LOGRMB (RMB_LOG_INFO,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s] register proxy success",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
            usCmd);

  }
  else
  {
    if (strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s] [ret:%d] register proxy failed, iRet=%d",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
              usCmd, serRet);
    }
    else
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [msg:%s] [cmd:%s]register proxy failed, unknown cmd",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, msg,
              usCmd);
    }
    return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                     THREAD_STATE_BREAK);
  }

  //send start command
  if (flag == 1)
  {
    StWemqThreadMsg stThreadMsg;
    memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg));
    stThreadMsg.m_iCmd = THREAD_MSG_CMD_START;

    WEMQJSON *jsonHeader = json_object_new_object ();
    if (jsonHeader == NULL)
    {
      LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed");
      return -2;
    }
    //add command
    json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
                            json_object_new_string (LISTEN_REQUEST));
    //add seq
    json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
                            json_object_new_int (0));
    //add code
    json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
                            json_object_new_int (0));

    const char *header_str = json_object_get_string (jsonHeader);
    if (header_str == NULL)
    {
      LOGRMB (RMB_LOG_ERROR, "header is null");
      json_object_put (jsonHeader);
      return -2;
    }

    stThreadMsg.m_iHeaderLen = strlen (header_str);
    LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%u, %s",
            stThreadMsg.m_iHeaderLen, header_str) stThreadMsg.m_pHeader =
      (char *) malloc ((stThreadMsg.m_iHeaderLen + 1) * sizeof (char));
    if (stThreadMsg.m_pHeader == NULL)
    {
      LOGRMB (RMB_LOG_ERROR, "malloc for stThreadMsg.m_pHeader failed");
      json_object_put (jsonHeader);
      return -2;
    }
    memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen);
    stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0';

    json_object_put (jsonHeader);

    stThreadMsg.m_iBodyLen = 0;
    stThreadMsg.m_pBody = NULL;

    int iRet = _wemq_thread_do_cmd_start_msg (pThreadCtx, &stThreadMsg);
    if (iRet == -2)
    {
      _wemq_thread_clear_thread_msg (&stThreadMsg);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }

    iRet = _wemq_thread_do_recv_sync (pThreadCtx);
    if (iRet != 0)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] Decode Wemq Header ERROR",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }

    StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
    jsonHeader = NULL;
    WEMQJSON *jsonTmp = NULL;
    char *usCmd;
    char *msg;
    int serRet = -1;
    int seq = -1;
    long time = 0;

    jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
    if (jsonHeader == NULL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[Type:%d] [TID:%lu] [LocalPort:%d] json_tokener_parse error:%s",
              pThreadCtx->m_contextType, pThreadCtx->m_threadID,
              pThreadCtx->m_iLocalPort, pWemqHeader->cStrJsonHeader);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }

    json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
    if (jsonTmp != NULL)
    {
      usCmd = json_object_get_string (jsonTmp);
    }

    json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
    if (jsonTmp != NULL)
    {
      serRet = json_object_get_int (jsonTmp);
    }
    json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
    if (jsonTmp != NULL)
    {
      seq = json_object_get_int (jsonTmp);
    }
    json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
    if (jsonTmp != NULL)
    {
      msg = json_object_get_string (jsonTmp);
    }

    json_object_put (jsonHeader);

    if (serRet != 0 || (strcmp (usCmd, LISTEN_RESPONSE) != 0))
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [Seq:%d] [msg:%s] [CMD:%d] reconnect send start listen error:%d",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, seq, msg,
              usCmd, serRet);
      return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                       THREAD_STATE_BREAK);
    }
  }

  return _wemq_thread_state_trans (pThreadCtx, pThreadCtx->m_iState,
                                   THREAD_STATE_OK);
}