in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [945:1090]
static int32_t _wemq_thread_send_error_log (WemqThreadCtx * pThreadCtx,
int seq, int errCode,
char *logPoint, char *errMsg,
StRmbMsg * ptSendMsg)
{
char *buf = pThreadCtx->m_pSendBuff;
int iRet = -1;
WEMQJSON *jsonHeader = json_object_new_object ();
// 组装消息
json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR,
json_object_new_string (TRACE_LOG_TO_LOGSERVER));
json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT,
json_object_new_int (seq));
json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT,
json_object_new_int (errCode));
const char *header_str = json_object_get_string (jsonHeader);
if (header_str == NULL)
{
json_object_put (jsonHeader);
return -1;
}
WEMQJSON *jsonBody = json_object_new_object ();
if (jsonBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object return null");
json_object_put (jsonHeader);
return -1;
}
WEMQJSON *jsonMessage = json_object_new_object ();
if (jsonMessage == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_object_new_object return null");
json_object_put (jsonHeader);
json_object_put (jsonBody);
return -1;
}
char cTopic[128];
char serviceOrEvent = (*(ptSendMsg->strServiceId + 3) == '0') ? 's' : 'e';
snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c",
ptSendMsg->strTargetDcn, serviceOrEvent, ptSendMsg->strServiceId,
ptSendMsg->strScenarioId, *(ptSendMsg->strServiceId + 3));
json_object_object_add (jsonMessage, MSG_BODY_TOPIC_STR,
json_object_new_string (cTopic));
WEMQJSON *jsonBodyProperty =
rmb_pub_encode_property_for_wemq (THREAD_MSG_CMD_RECV_MSG_ACK, ptSendMsg);
if (jsonBodyProperty == NULL)
{
json_object_put (jsonHeader);
json_object_put (jsonMessage);
json_object_put (jsonBody);
LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_property_for_wemq return null");
return -1;
}
json_object_object_add (jsonMessage, MSG_BODY_PROPERTY_JSON,
jsonBodyProperty);
WEMQJSON *jsonByteBody =
rmb_pub_encode_byte_body_for_wemq (THREAD_MSG_CMD_RECV_MSG_ACK,
ptSendMsg);
if (jsonByteBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_byte_body_for_wemq return null");
json_object_put (jsonHeader);
json_object_put (jsonMessage);
json_object_put (jsonBody);
return -1;
}
const char *byteBodyStr = json_object_get_string (jsonByteBody);
json_object_object_add (jsonMessage, MSG_BODY_BYTE_BODY_JSON,
json_object_new_string (byteBodyStr));
const char *message_str = json_object_get_string (jsonMessage);
if (message_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "Get thread msg body failed\n");
json_object_put (jsonHeader);
json_object_put (jsonMessage);
json_object_put (jsonByteBody);
json_object_put (jsonBody);
return -1;
}
json_object_object_add (jsonBody, "retCode", json_object_new_int (errCode));
json_object_object_add (jsonBody, "retMsg",
json_object_new_string (errMsg));
json_object_object_add (jsonBody, "level",
json_object_new_string ("error"));
json_object_object_add (jsonBody, "logPoint",
json_object_new_string (logPoint));
json_object_object_add (jsonBody, "model",
json_object_new_string ("model"));
json_object_object_add (jsonBody, "lang", json_object_new_string ("c"));
json_object_object_add (jsonBody, "message",
json_object_new_string (message_str));
const char *body_str = json_object_get_string (jsonBody);
if (body_str == NULL)
{
LOGRMB (RMB_LOG_ERROR, "Get thread msg body failed\n");
json_object_put (jsonHeader);
json_object_put (jsonMessage);
json_object_put (jsonByteBody);
json_object_put (jsonBody);
return -1;
}
int iHeaderLen = strlen (header_str);
int iBodyLen = strlen (body_str);
int iTotalLen = iHeaderLen + iBodyLen + 8;
ENCODE_INT (buf, iTotalLen);
ENCODE_INT (buf, iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, header_str, iHeaderLen);
ENCODE_DWSTR_MEMCPY (buf, body_str, iBodyLen);
// json_object_put(jsonHeader);
LOGRMB (RMB_LOG_DEBUG, "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] Send:%s\n",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort, body_str);
json_object_put (jsonHeader);
json_object_put (jsonMessage);
json_object_put (jsonByteBody);
json_object_put (jsonBody);
iRet =
_wemq_thread_do_send_sync (pThreadCtx, pThreadCtx->m_pSendBuff, iTotalLen,
iHeaderLen);
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] _wemq_thread_send log error!\n",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort);
return -2;
}
return 0;
}