static int32_t _wemq_thread_do_cmd_send_msg_reg()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [3318:3460]


static int32_t _wemq_thread_do_cmd_send_msg_reg (WemqThreadCtx * pThreadCtx)
{
  StWemqThreadMsg *pStWemqThreadMsg = &pThreadCtx->m_stHelloWord;
  int iRet = -1;
  int iTotalLen =
    pStWemqThreadMsg->m_iHeaderLen + pStWemqThreadMsg->m_iBodyLen + 8;

  char *buf = pThreadCtx->m_pSendBuff;
  ENCODE_INT (buf, iTotalLen);
  ENCODE_INT (buf, pStWemqThreadMsg->m_iHeaderLen);
  ENCODE_DWSTR_MEMCPY (buf, pStWemqThreadMsg->m_pHeader,
                       pStWemqThreadMsg->m_iHeaderLen);
  ENCODE_DWSTR_MEMCPY (buf, pStWemqThreadMsg->m_pBody,
                       pStWemqThreadMsg->m_iBodyLen);

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] |wemq_thread2accesss| Send header:%s, body:%s, totalLen:%d, headerLen:%d",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
          pStWemqThreadMsg->m_pHeader, pStWemqThreadMsg->m_pBody, iTotalLen,
          pStWemqThreadMsg->m_iHeaderLen);

  iRet =
    _wemq_thread_do_send_sync (pThreadCtx, pThreadCtx->m_pSendBuff, iTotalLen,
                               pStWemqThreadMsg->m_iHeaderLen);

  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_do_send_sync error!",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
    //return _wemq_thread_state_trans(pThreadCtx, pThreadCtx->m_iState, THREAD_STATE_BREAK);
    return -1;
  }

  iRet = _wemq_thread_do_recv_sync (pThreadCtx);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_do_recv_sync error: %d",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, iRet);
    return -2;
  }

  StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
  WEMQJSON *jsonHeader = NULL;
  WEMQJSON *jsonTmp = NULL;
  int usCmd = -1;
  int serRet = -1;
  int seq = -1;
  long time = 0;
  char cMessage[100];

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] |access2wemq_thread|Recv: %s",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
          pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
          pWemqHeader->cStrJsonHeader);
  jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
  if (jsonHeader == NULL)
  {
    // 消息不完整, json解析失败
    LOGRMB (RMB_LOG_ERROR, "[Type:%d] [TID:%lu] json_tokener_parse error: %s",
            pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pWemqHeader->cStrJsonHeader) return 1;
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    usCmd = json_object_get_int (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    serRet = json_object_get_int (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    seq = json_object_get_int (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_TIME_LINT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    time = (long) json_object_get_int64 (jsonTmp);
  }
  json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    memset (cMessage, 0x00, sizeof (cMessage));
    strncpy (cMessage, json_object_get_string (jsonTmp),
             sizeof (cMessage) - 1);
  }
  json_object_put (jsonHeader);

  if (strcmp (cMessage, "auth exception") == 0)
  {                             // wemq user/passwd error
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Authentication error!user:%s, passwd:%s",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
            pRmbStConfig->cWemqUser, pRmbStConfig->cWemqPasswd);
    return -1;
  }
  if (serRet == RMB_CODE_OTHER_FAIL)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy error:%d",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
            serRet);
    return -2;
  }

  if (serRet == RMB_CODE_AUT_FAIL)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy Authentication error:%d",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
            serRet);
    return -3;
  }

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] register proxy success!",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
          pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

  //hello world之后,心跳包可以延期
  gettimeofday (&pThreadCtx->stTimeLast, NULL);
  gettimeofday (&pThreadCtx->stTimeLastRecv, NULL);

  return 0;

}