static int32_t _wemq_thread_dyed_msg_ack_to_access()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [612:727]


static int32_t _wemq_thread_dyed_msg_ack_to_access (WemqThreadCtx *
                                                    pThreadCtx, int seq,
                                                    int status, char *msgType,
                                                    StRmbMsg * ptSendMsg)
{
  char *buf = pThreadCtx->m_pSendBuff;
  int iRet = -1;
  WEMQJSON *jsonHeader = json_object_new_object ();

  // 组装消息
  json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
                          json_object_new_string (msgType));
  json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
                          json_object_new_int (seq));
  json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
                          json_object_new_int (status));

  const char *header_str = json_object_get_string (jsonHeader);
  if (header_str == NULL)
  {
    json_object_put (jsonHeader);
    return -1;
  }

  WEMQJSON *jsonBody = json_object_new_object ();
  if (jsonBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "json_object_new_object return null");
    return -1;
  }
  int wemqMsgType = 0;
  if (strcmp (msgType, REQUEST_TO_CLIENT) == 0
      || strcmp (msgType, ASYNC_MESSAGE_TO_CLIENT)
      || strcmp (msgType, BROADCAST_MESSAGE_TO_CLIENT))
  {
    wemqMsgType = THREAD_MSG_CMD_SEND_MSG_ACK;
  }
  else if (strcmp (msgType, RESPONSE_TO_CLIENT))
  {
    wemqMsgType = THREAD_MSG_CMD_RECV_MSG_ACK;
  }

  char cTopic[128];
  char serviceOrEvent = (*(ptSendMsg->strServiceId + 3) == '0') ? 's' : 'e';
  snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c",
            ptSendMsg->strTargetDcn, serviceOrEvent, ptSendMsg->strServiceId,
            ptSendMsg->strScenarioId, *(ptSendMsg->strServiceId + 3));
  json_object_object_add (jsonBody, MSG_BODY_TOPIC_STR,
                          json_object_new_string (cTopic));

  WEMQJSON *jsonBodyProperty =
    rmb_pub_encode_property_for_wemq (wemqMsgType, ptSendMsg);
  if (jsonBodyProperty == NULL)
  {
    json_object_put (jsonHeader);
    json_object_put (jsonBody);
    LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_property_for_wemq return null");
    return -1;
  }

  json_object_object_add (jsonBody, MSG_BODY_PROPERTY_JSON, jsonBodyProperty);

  WEMQJSON *jsonByteBody =
    rmb_pub_encode_byte_body_for_wemq (wemqMsgType, ptSendMsg);
  if (jsonByteBody == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_byte_body_for_wemq return null");
    return -1;
  }
  const char *byteBodyStr = json_object_get_string (jsonByteBody);

  json_object_object_add (jsonBody, MSG_BODY_BYTE_BODY_JSON,
                          json_object_new_string (byteBodyStr));

  const char *body_str = json_object_get_string (jsonBody);
  if (body_str == NULL)
  {
    LOGRMB (RMB_LOG_ERROR, "Get thread msg body failed\n");
    json_object_put (jsonHeader);
    json_object_put (jsonBody);
    json_object_put (jsonByteBody);

    return -1;
  }

  int iHeaderLen = strlen (header_str);
  int iBodyLen = strlen (body_str);
  int iTotalLen = iHeaderLen + iBodyLen + 8;

  ENCODE_INT (buf, iTotalLen);
  ENCODE_INT (buf, iHeaderLen);
  ENCODE_DWSTR_MEMCPY (buf, header_str, iHeaderLen);
  ENCODE_DWSTR_MEMCPY (buf, body_str, iBodyLen);
//      json_object_put(jsonHeader);

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] |wemq_thread2accesss|Send:header_str:%s   body_str:%s\n",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, header_str,
          body_str);
  json_object_put (jsonHeader);
  json_object_put (jsonBody);
  json_object_put (jsonByteBody);
  iRet =
    _wemq_thread_do_send_sync (pThreadCtx, pThreadCtx->m_pSendBuff, iTotalLen,
                               iHeaderLen);
  if (iRet != 0)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_resp_ack_to_access error!\n",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
    return -2;
  }
  return 0;
}