in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [612:727]
static int32_t _wemq_thread_dyed_msg_ack_to_access (WemqThreadCtx *
pThreadCtx, int seq,
int status, char *msgType,
StRmbMsg * ptSendMsg)
{
char *buf = pThreadCtx->m_pSendBuff;
int iRet = -1;
WEMQJSON *jsonHeader = json_object_new_object ();
// 组装消息
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (msgType));
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (seq));
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (status));
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
json_object_put (jsonHeader);
return -1;
}
WEMQJSON *jsonBody = json_object_new_object ();
if (jsonBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object return null");
return -1;
}
int wemqMsgType = 0;
if (strcmp (msgType, REQUEST_TO_CLIENT) == 0
|| strcmp (msgType, ASYNC_MESSAGE_TO_CLIENT)
|| strcmp (msgType, BROADCAST_MESSAGE_TO_CLIENT))
{
wemqMsgType = THREAD_MSG_CMD_SEND_MSG_ACK;
}
else if (strcmp (msgType, RESPONSE_TO_CLIENT))
{
wemqMsgType = THREAD_MSG_CMD_RECV_MSG_ACK;
}
char cTopic[128];
char serviceOrEvent = (*(ptSendMsg->strServiceId + 3) == '0') ? 's' : 'e';
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c",
ptSendMsg->strTargetDcn, serviceOrEvent, ptSendMsg->strServiceId,
ptSendMsg->strScenarioId, *(ptSendMsg->strServiceId + 3));
json_object_object_add (jsonBody, MSG_BODY_TOPIC_STR,
json_object_new_string (cTopic));
WEMQJSON *jsonBodyProperty =
rmb_pub_encode_property_for_wemq (wemqMsgType, ptSendMsg);
if (jsonBodyProperty == NULL)
{
json_object_put (jsonHeader);
json_object_put (jsonBody);
LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_property_for_wemq return null");
return -1;
}
json_object_object_add (jsonBody, MSG_BODY_PROPERTY_JSON, jsonBodyProperty);
WEMQJSON *jsonByteBody =
rmb_pub_encode_byte_body_for_wemq (wemqMsgType, ptSendMsg);
if (jsonByteBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_byte_body_for_wemq return null");
return -1;
}
const char *byteBodyStr = json_object_get_string (jsonByteBody);
json_object_object_add (jsonBody, MSG_BODY_BYTE_BODY_JSON,
json_object_new_string (byteBodyStr));
const char *body_str = json_object_get_string (jsonBody);
if (body_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "Get thread msg body failed\n");
json_object_put (jsonHeader);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
return -1;
}
int iHeaderLen = strlen (header_str);
int iBodyLen = strlen (body_str);
int iTotalLen = iHeaderLen + iBodyLen + 8;
ENCODE_INT (buf, iTotalLen);
ENCODE_INT (buf, iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, header_str, iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, body_str, iBodyLen);
// json_object_put(jsonHeader);
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] |wemq_thread2accesss|Send:header_str:%s body_str:%s\n",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, header_str,
body_str);
json_object_put (jsonHeader);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
iRet =
_wemq_thread_do_send_sync (pThreadCtx, pThreadCtx->m_pSendBuff, iTotalLen,
iHeaderLen);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_resp_ack_to_access error!\n",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
return -2;
}
return 0;
}