eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c (1,946 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <string.h> #include <sys/types.h> #include <pthread.h> #include <unistd.h> #include "common.h" #include "rmb_pub.h" #include "rmb_context.h" #include "rmb_udp.h" #include "rmb_common.h" #include "rmb_errno.h" #include "wemq_thread.h" static int g_iSendReq = 0; static unsigned long g_iSendReqForEvent = 100000000; static StRmbPub *pRmbGlobalPub; unsigned int g_uiSendMsgSeq = 0; unsigned int g_uiRecvMsgSeq = 0; unsigned int DEFAULT_WINDOW_SIZE = 100; typedef struct gsl_err { char result; const char *err_msg; } gsl_err; gsl_err gsl_error[] = { {0x01, NULL}, {0x11, "query ckv null"}, {0x21, "chvvalue.orgid_list_size() = 0"}, {0x31, "orgid for query rules is null"}, {0x41, "no available rules for service"}, {0x51, "ckvvalue.org.dcn_list_size() = 0"}, {0x61, "target dcn is null according to query rules for event"}, {0x71, NULL}, {0x81, NULL}, {0x91, NULL}, {0xA1, NULL}, {0xB1, NULL}, {0xC1, NULL}, {0xD1, NULL}, {0xE1, NULL}, {0xF1, NULL}, {0x02, NULL}, {0x12, "CommonOrgid points to SingleOrgid, service cannot be deployed in more than one ADM or C-DCN"}, {0x22, "CommonOrgid points to SingleOrgid, service cannot be found in either ADM or C-DCN"}, {0x32, "public service id is deployed in multiple regions causing conflicts"}, {0x42, "service is poly-active in one area(R-DCN/C-DCN/ADM/Common), but dcn matching IDC of clientDcn not found"}, {0x52, "event subscriptors belong to one area(CS/DMZ/ECN), but dcn matching IDC of clientDcn not found"}, {0x62, NULL}, {0x72, NULL}, {0x82, NULL}, {0x92, NULL}, {0xA2, NULL}, {0xB2, NULL}, {0xC2, NULL}, {0xD2, NULL}, {0xE2, NULL}, {0xF2, NULL}, {0x03, NULL}, {0x13, "DecodeBuf error or ParseFromString error"}, {0x23, "query ckv error"}, {0x33, NULL}, {0x43, NULL}, {0x53, NULL}, {0x63, NULL}, {0x73, NULL}, {0x83, NULL}, {0x93, NULL}, {0xA3, NULL}, {0xB3, NULL}, {0xC3, NULL}, {0xD3, NULL}, {0xE3, NULL}, {0xF3, NULL} }; //#define TCP_BUF_SIZE 5<<01 static int rmb_pub_encode_header_for_wemq (unsigned int uiCmd, StWemqThreadMsg * ptThreadMsg, StRmbMsg * ptSendMsg) { if (ptThreadMsg == NULL || ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptThreadMsg is null or ptSendMsg is null"); return -1; } switch (uiCmd) { case THREAD_MSG_CMD_SEND_MSG: { ptThreadMsg->m_iCmd = THREAD_MSG_CMD_SEND_MSG; WEMQJSON *jsonHeader = json_object_new_object (); // ptSendMsg->strServiceId[3] == '3' 多播使用ASYNC_MESSAGE_TO_SERVER if (ptSendMsg->strServiceId[3] == '4') { json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (BROADCAST_MESSAGE_TO_SERVER)); } else { json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (ASYNC_MESSAGE_TO_SERVER)); } json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (++g_iSendReqForEvent)); LOGRMB (RMB_LOG_DEBUG, "put seq:%ld in pkg", g_iSendReqForEvent); json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "get thread msg header failed"); json_object_put (jsonHeader); return -1; } ptThreadMsg->m_iHeaderLen = strlen (header_str); LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%d,%s\n", ptThreadMsg->m_iHeaderLen, header_str); ptThreadMsg->m_pHeader = (char *) malloc ((ptThreadMsg->m_iHeaderLen + 1) * sizeof (char)); if (ptThreadMsg->m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pHeader failed"); json_object_put (jsonHeader); return -2; } memcpy (ptThreadMsg->m_pHeader, header_str, ptThreadMsg->m_iHeaderLen); ptThreadMsg->m_pHeader[ptThreadMsg->m_iHeaderLen] = '\0'; json_object_put (jsonHeader); return 0; } case THREAD_MSG_CMD_SEND_REQUEST: { ptThreadMsg->m_iCmd = THREAD_MSG_CMD_SEND_REQUEST; WEMQJSON *jsonHeader = json_object_new_object (); json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (REQUEST_TO_SERVER)); json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (g_iSendReq++)); json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "Get thread msg header failed"); json_object_put (jsonHeader); return -1; } ptThreadMsg->m_iHeaderLen = strlen (header_str); LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%d,%s\n", ptThreadMsg->m_iHeaderLen, header_str); ptThreadMsg->m_pHeader = (char *) malloc ((ptThreadMsg->m_iHeaderLen + 1) * sizeof (char)); if (ptThreadMsg->m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pHeader failed\n"); json_object_put (jsonHeader); return -2; } memcpy (ptThreadMsg->m_pHeader, header_str, ptThreadMsg->m_iHeaderLen); ptThreadMsg->m_pHeader[ptThreadMsg->m_iHeaderLen] = '\0'; json_object_put (jsonHeader); return 0; } case THREAD_MSG_CMD_SEND_REQUEST_ASYNC: { ptThreadMsg->m_iCmd = THREAD_MSG_CMD_SEND_REQUEST_ASYNC; WEMQJSON *jsonHeader = json_object_new_object (); json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (REQUEST_TO_SERVER)); json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (g_iSendReq++)); json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "Get thread msg header failed"); json_object_put (jsonHeader); return -1; } ptThreadMsg->m_iHeaderLen = strlen (header_str); LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%d,%s\n", ptThreadMsg->m_iHeaderLen, header_str); ptThreadMsg->m_pHeader = (char *) malloc ((ptThreadMsg->m_iHeaderLen + 1) * sizeof (char)); if (ptThreadMsg->m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pHeader failed\n"); json_object_put (jsonHeader); return -2; } memcpy (ptThreadMsg->m_pHeader, header_str, ptThreadMsg->m_iHeaderLen); ptThreadMsg->m_pHeader[ptThreadMsg->m_iHeaderLen] = '\0'; json_object_put (jsonHeader); return 0; } case THREAD_MSG_CMD_SEND_REPLY: { ptThreadMsg->m_iCmd = THREAD_MSG_CMD_SEND_REPLY; WEMQJSON *jsonHeader = json_object_new_object (); json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (RESPONSE_TO_SERVER)); json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (g_iSendReq++)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "Get thread msg header failed"); json_object_put (jsonHeader); return -1; } ptThreadMsg->m_iHeaderLen = strlen (header_str); //LOGRMB(RMB_LOG_DEBUG, "Get thread msg header succ, len=%d,%s\n", ptThreadMsg->m_iHeaderLen, header_str); ptThreadMsg->m_pHeader = (char *) malloc ((ptThreadMsg->m_iHeaderLen + 1) * sizeof (char)); if (ptThreadMsg->m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pHeader failed\n"); json_object_put (jsonHeader); return -2; } memcpy (ptThreadMsg->m_pHeader, header_str, ptThreadMsg->m_iHeaderLen); ptThreadMsg->m_pHeader[ptThreadMsg->m_iHeaderLen] = '\0'; json_object_put (jsonHeader); return 0; } case THREAD_MSG_CMD_SEND_MSG_ACK: { ptThreadMsg->m_iCmd = THREAD_MSG_CMD_SEND_MSG_ACK; WEMQJSON *jsonHeader = json_object_new_object (); json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (MSG_ACK)); json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (g_iSendReq++)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "Get thread msg header failed"); json_object_put (jsonHeader); return -1; } ptThreadMsg->m_iHeaderLen = strlen (header_str); //LOGRMB(RMB_LOG_DEBUG, "Get thread msg header succ, len=%d,%s\n", ptThreadMsg->m_iHeaderLen, header_str); ptThreadMsg->m_pHeader = (char *) malloc ((ptThreadMsg->m_iHeaderLen + 1) * sizeof (char)); if (ptThreadMsg->m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pHeader failed\n"); json_object_put (jsonHeader); return -2; } memcpy (ptThreadMsg->m_pHeader, header_str, ptThreadMsg->m_iHeaderLen); ptThreadMsg->m_pHeader[ptThreadMsg->m_iHeaderLen] = '\0'; json_object_put (jsonHeader); return 0; } default: LOGRMB (RMB_LOG_ERROR, "unknown cmd:%u", uiCmd); return -1; } return 0; } static WEMQJSON *rmb_pub_encode_system_header_for_wemq (unsigned int uiCmd, StRmbMsg * ptSendMsg) { if (ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptSendMsg is null"); return NULL; } WEMQJSON *jsonSystem = json_object_new_object (); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_BIZ_STR, json_object_new_string (ptSendMsg->sysHeader. cBizSeqNo)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_SEQNO_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerSeqNo)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_SVRID_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerSvrId)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_ORGSYS_STR, json_object_new_string (ptSendMsg->sysHeader. cOrgSysId)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_CSMID_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerSysId)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_TIME_LINT, json_object_new_int64 (ptSendMsg->sysHeader. ulTranTimeStamp)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_CSMDCN_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerDcn)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_ORGSVR_STR, json_object_new_string (ptSendMsg->sysHeader. cOrgSvrId)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_ORGID_STR, json_object_new_string (ptSendMsg->sysHeader. cOrgId)); //发送消息时,version为consumerSysVersion,且必须为1.0.0 //json_object_object_add(jsonSystem, MSG_BODY_SYSTEM_VER_STR, json_object_new_string("weq_c_1_0_0")); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_VER_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerSysVersion)); //add rmb api version json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_API_VERSION, json_object_new_string (RMBVERSION)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_UNIID_STR, json_object_new_string (ptSendMsg->sysHeader. cUniqueId)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_CONLEN_INT, json_object_new_int (ptSendMsg->sysHeader. iContentLength)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_MSGTYPE_INT, json_object_new_int (1)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_RECVTYPE_INT, json_object_new_int (1)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_SENDTIME_LINT, json_object_new_int64 (ptSendMsg->sysHeader. ulSendTime)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_RECVTIME_LINT, json_object_new_int64 (ptSendMsg->sysHeader. ulReceiveTime)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_REPLYTIME_LINT, json_object_new_int64 (ptSendMsg->sysHeader. ulReplyTime)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_REPLYRECEIVETIME_LINT, json_object_new_int64 (ptSendMsg->sysHeader. ulReplyReceiveTime)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_APITYPE_INT, json_object_new_int (ptSendMsg->cApiType)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_LOGICTYPE_INT, json_object_new_int ((int32_t) ptSendMsg-> cLogicType)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_SOCOID_STR, json_object_new_string ("#C")); if (uiCmd == THREAD_MSG_CMD_SEND_REPLY || uiCmd == THREAD_MSG_CMD_RECV_MSG_ACK || uiCmd == THREAD_MSG_CMD_SEND_MSG_ACK) //一般发回包时候、单播回ack,需要rsp_ip { WEMQJSON *extFields = json_tokener_parse (ptSendMsg->sysHeader.cExtFields); if (extFields == NULL) { extFields = json_object_new_object (); } json_object_object_add (extFields, MSG_BODY_SYSTEM_RSP_IP, json_object_new_string (pRmbStConfig->cHostIp)); json_object_object_add (extFields, MSG_BODY_SYSTEM_RSP_SYS, json_object_new_string (pRmbStConfig-> cConsumerSysId)); json_object_object_add (extFields, MSG_BODY_SYSTEM_RSP_DCN, json_object_new_string (pRmbStConfig-> cConsumerDcn)); json_object_object_add (extFields, MSG_BODY_SYSTEM_RSP_IDC, json_object_new_string (pRmbStConfig->cRegion)); json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_EXTFIELDS_STR, extFields); const char *systemStr = json_object_get_string (jsonSystem); WEMQJSON *jsonSystemHeader = json_tokener_parse (systemStr); json_object_put (jsonSystem); return jsonSystemHeader; } else //发包时候 { WEMQJSON *extFields = json_tokener_parse (ptSendMsg->sysHeader.cExtFields); if (extFields == NULL) { extFields = json_object_new_object (); } json_object_object_add (extFields, MSG_BODY_SYSTEM_REQ_IP, json_object_new_string (pRmbStConfig->cHostIp)); json_object_object_add (extFields, MSG_BODY_SYSTEM_REQ_SYS, json_object_new_string (pRmbStConfig-> cConsumerSysId)); json_object_object_add (extFields, MSG_BODY_SYSTEM_REQ_DCN, json_object_new_string (pRmbStConfig-> cConsumerDcn)); json_object_object_add (extFields, MSG_BODY_SYSTEM_REQ_IDC, json_object_new_string (pRmbStConfig->cRegion)); if (strcmp (ptSendMsg->isDyedMsg, "true") == 0) { json_object_object_add (extFields, IS_DYED_MSG, json_object_new_string ("true")); } json_object_object_add (jsonSystem, MSG_BODY_SYSTEM_EXTFIELDS_STR, extFields); const char *systemStr = json_object_get_string (jsonSystem); WEMQJSON *jsonSystemHeader = json_tokener_parse (systemStr); json_object_put (jsonSystem); return jsonSystemHeader; } } /** * set destination proto, like: * "destination" : { * "name" : "A00/s/10000000/01/0", * "type" : "se", * "serviceOrEventId" : "10000000", * "scenario" : "01", * "dcnNo" : "A00", -- not support 000 * "organizationId" : "99996", * "organizationIdInputFlag" : 0, * } * 其中,type固定为"se",wemq java历史遗留问题 */ static WEMQJSON *rmb_pub_encode_body_dest_for_wemq (StRmbMsg * ptSendMsg) { if (ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptSendMsg is null"); return NULL; } WEMQJSON *jsonDest = json_object_new_object (); if (jsonDest == NULL) { LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed"); return NULL; } json_object_object_add (jsonDest, MSG_BODY_DEST_NAME_STR, json_object_new_string (ptSendMsg->dest.cDestName)); json_object_object_add (jsonDest, MSG_BODY_DEST_TYPE_STR, json_object_new_string ("se")); json_object_object_add (jsonDest, MSG_BODY_DEST_SORE_STR, json_object_new_string (ptSendMsg->strServiceId)); json_object_object_add (jsonDest, MSG_BODY_DEST_SCENARIO_STR, json_object_new_string (ptSendMsg->strScenarioId)); json_object_object_add (jsonDest, MSG_BODY_DEST_DCN_STR, json_object_new_string (ptSendMsg->strTargetDcn)); json_object_object_add (jsonDest, MSG_BODY_DEST_ANY_DCN_STR, json_object_new_boolean (0)); json_object_object_add (jsonDest, MSG_BODY_DEST_ORGID_STR, json_object_new_string (ptSendMsg->strTargetOrgId)); json_object_object_add (jsonDest, MSG_BODY_DEST_ORGFLAG_INT, json_object_new_int (ptSendMsg->iFLagForOrgId)); return jsonDest; } WEMQJSON *rmb_pub_encode_property_for_wemq (unsigned int uiCmd, StRmbMsg * ptSendMsg) { if (ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptSendMsg is null"); return NULL; } WEMQJSON *jsonProperty = NULL; if (uiCmd == THREAD_MSG_CMD_SEND_REPLY || uiCmd == THREAD_MSG_CMD_RECV_MSG_ACK || uiCmd == THREAD_MSG_CMD_SEND_MSG_ACK) { jsonProperty = json_tokener_parse (ptSendMsg->sysHeader.cProperty); if (jsonProperty == NULL) { jsonProperty = json_object_new_object (); } } else { jsonProperty = json_object_new_object (); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_REPLYTO_STR, json_object_new_string (ptSendMsg->replyTo. cDestName)); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_RR_REQUEST_UNIQ_ID_STR, json_object_new_string (ptSendMsg->sysHeader. cUniqueId)); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_KEYS_STR, json_object_new_string (ptSendMsg->sysHeader. cConsumerSeqNo)); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_MSG_TYPE_STR, json_object_new_string ("persistent")); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_TTL_INT, json_object_new_int64 (ptSendMsg->ulMsgLiveTime)); json_object_object_add (jsonProperty, MSG_BODY_PROPERTY_SEQ_STR, json_object_new_string (ptSendMsg->sysHeader. cBizSeqNo)); } return jsonProperty; } WEMQJSON *rmb_pub_encode_byte_body_for_wemq (unsigned int uiCmd, StRmbMsg * ptSendMsg) { if (ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptSendMsg is null"); return NULL; } WEMQJSON *jsonByteBody = json_object_new_object (); json_object_object_add (jsonByteBody, MSG_BODY_BYTE_BODY_APPHEADER_CONTENT_JSON, json_object_new_string (ptSendMsg->cAppHeader)); json_object_object_add (jsonByteBody, MSG_BODY_BYTE_BODY_APPHEADER_NAME_STR, json_object_new_string (ptSendMsg->sysHeader. cAppHeaderClass)); json_object_object_add (jsonByteBody, MSG_BODY_BYTE_BODY_CONTENT_STR, json_object_new_string_len (ptSendMsg->cContent, ptSendMsg-> iContentLen)); json_object_object_add (jsonByteBody, MSG_BODY_BYTE_BODY_CREATETIME_LINT, json_object_new_int64 (pRmbStConfig->ulNowTtime)); json_object_object_add (jsonByteBody, MSG_BODY_COID_STR, json_object_new_string ("#c")); json_object_object_add (jsonByteBody, MSG_BODY_DELIVERYTIME_INT, json_object_new_int (1)); json_object_object_add (jsonByteBody, MSG_BODY_TTL_LINT, json_object_new_int64 (ptSendMsg->ulMsgLiveTime)); if (uiCmd == THREAD_MSG_CMD_SEND_REQUEST || uiCmd == THREAD_MSG_CMD_SEND_REQUEST_ASYNC || uiCmd == THREAD_MSG_CMD_SEND_REPLY) { json_object_object_add (jsonByteBody, MSG_BODY_SYN_BOOL, json_object_new_boolean (1)); } else { json_object_object_add (jsonByteBody, MSG_BODY_SYN_BOOL, json_object_new_boolean (0)); } WEMQJSON *jsonSystemHeaderContent = rmb_pub_encode_system_header_for_wemq (uiCmd, ptSendMsg); if (jsonSystemHeaderContent == NULL) { LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_system_header return null"); return NULL; } const char *systemHeaderContentStr = json_object_get_string (jsonSystemHeaderContent); json_object_object_add (jsonByteBody, MSG_BODY_BYTE_BODY_SYSTEM_HEADER_CONTENT_JSON, json_object_new_string (systemHeaderContentStr)); WEMQJSON *jsonBodyDest = rmb_pub_encode_body_dest_for_wemq (ptSendMsg); if (jsonBodyDest == NULL) { LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_body_dest return null"); return NULL; } const char *jsonBodyDestStr = json_object_get_string (jsonBodyDest); json_object_object_add (jsonByteBody, MSG_BODY_DEST_JSON, json_object_new_string (jsonBodyDestStr)); const char *byteBodyStr = json_object_get_string (jsonByteBody); if (byteBodyStr == NULL) { json_object_put (jsonByteBody); return NULL; } int sysLen = strlen (byteBodyStr); //LOGRMB(RMB_LOG_DEBUG, "Gen thread msg json byte body succ, len %d, %s\n", sysLen, byteBodyStr); json_object_put (jsonSystemHeaderContent); json_object_put (jsonBodyDest); return jsonByteBody; } //根据最新协议解析 int rmb_pub_encode_body_for_wemq (unsigned int uiCmd, StWemqThreadMsg * ptThreadMsg, StRmbMsg * ptSendMsg, unsigned long ulTimeToAlive) { if (ptThreadMsg == NULL || ptSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptThreadMsg or ptSendMsg is null"); return -1; } WEMQJSON *jsonBody = json_object_new_object (); if (jsonBody == NULL) { LOGRMB (RMB_LOG_ERROR, "json_object_new_object return null"); return -1; } char cTopic[128]; char serviceOrEvent = (*(ptSendMsg->strServiceId + 3) == '0') ? 's' : 'e'; snprintf (cTopic, sizeof (cTopic), "%s-%c-%s-%s-%c", ptSendMsg->strTargetDcn, serviceOrEvent, ptSendMsg->strServiceId, ptSendMsg->strScenarioId, *(ptSendMsg->strServiceId + 3)); json_object_object_add (jsonBody, MSG_BODY_TOPIC_STR, json_object_new_string (cTopic)); if (THREAD_MSG_CMD_SEND_REQUEST_ASYNC == uiCmd) { snprintf (ptSendMsg->sysHeader.cExtFields, sizeof ("{\"rrType\": 1 }"), "%s", "{\"rrType\": 1 }"); } WEMQJSON *jsonBodyProperty = rmb_pub_encode_property_for_wemq (uiCmd, ptSendMsg); if (jsonBodyProperty == NULL) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_property_for_wemq return null"); json_object_put (jsonBody); return -1; } json_object_object_add (jsonBody, MSG_BODY_PROPERTY_JSON, jsonBodyProperty); WEMQJSON *jsonByteBody = rmb_pub_encode_byte_body_for_wemq (uiCmd, ptSendMsg); if (jsonByteBody == NULL) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_byte_body_for_wemq return null"); json_object_put (jsonBody); return -1; } const char *byteBodyStr = json_object_get_string (jsonByteBody); json_object_object_add (jsonBody, MSG_BODY_BYTE_BODY_JSON, json_object_new_string (byteBodyStr)); //json_object_object_add(jsonBody, MSG_BODY_BYTE_BODY_JSON, jsonByteBody); const char *bodyStr = json_object_get_string (jsonBody); if (bodyStr == NULL) { LOGRMB (RMB_LOG_ERROR, "Get thread msg body failed\n"); json_object_put (jsonBody); json_object_put (jsonByteBody); return -1; } ptThreadMsg->m_iBodyLen = strlen (bodyStr); //LOGRMB(RMB_LOG_DEBUG, "Get thread msg body succ, len=%d,%s", ptThreadMsg->m_iBodyLen, bodyStr); ptThreadMsg->m_pBody = (char *) malloc ((ptThreadMsg->m_iBodyLen + 1) * sizeof (char)); if (ptThreadMsg->m_pBody == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for ptThreadMsg->m_pBody failed"); json_object_put (jsonBody); json_object_put (jsonByteBody); return -1; } // strncpy(ptThreadMsg->m_pBody, bodyStr, ptThreadMsg->m_iBodyLen); memcpy (ptThreadMsg->m_pBody, bodyStr, ptThreadMsg->m_iBodyLen); ptThreadMsg->m_pBody[ptThreadMsg->m_iBodyLen] = '\0'; json_object_put (jsonBody); json_object_put (jsonByteBody); return 0; } int rmb_pub_encode_thread_msg (unsigned int uiCmd, StWemqThreadMsg * ptThreadMsg, StRmbMsg * ptSendMsg, unsigned long ulTimeToAlive) { int iRet = -1; if (ptSendMsg == NULL || ptThreadMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "ptThreadMsg or ptSendMsg is null"); return -1; } iRet = rmb_pub_encode_header_for_wemq (uiCmd, ptThreadMsg, ptSendMsg); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_header_for_wemq failed"); return iRet; } iRet = rmb_pub_encode_body_for_wemq (uiCmd, ptThreadMsg, ptSendMsg, ulTimeToAlive); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_body_for_wemq failed"); return iRet; } LOGRMB (RMB_LOG_INFO, "Get thread msg header succ, headerLen=%d,%s Get thread msg body succ, bodyLen=%d,%s\n", ptThreadMsg->m_iHeaderLen, ptThreadMsg->m_pHeader, ptThreadMsg->m_iBodyLen, ptThreadMsg->m_pBody); return 0; } char *rmb_printf_service_status (StRmbPub * pStPub, StServiceStatus * pTmpService) { //if ((int)pTmpService->cResult == 1) if ((pTmpService->cResult & 0x0F) == 0x01) { snprintf (pStPub->printGslBuf, sizeof (pStPub->printGslBuf) - 1, "service[%s,%s,%s,%d] null,gettime=%lu,InvalidTime=%lu pls check!", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pTmpService->ulGetTimes, pTmpService->ulInvalidTime); } //else if ((int)pTmpService->cResult == 2) else if ((pTmpService->cResult & 0x0F) == 0x02) { snprintf (pStPub->printGslBuf, sizeof (pStPub->printGslBuf) - 1, "service[%s,%s,%s,%d] route error,gettime=%lu,InvalidTime=%lu pls check!", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pTmpService->ulGetTimes, pTmpService->ulInvalidTime); } //else if ((int)pTmpService->cResult == 3) else if ((pTmpService->cResult & 0x0F) == 0x03) { snprintf (pStPub->printGslBuf, sizeof (pStPub->printGslBuf) - 1, "service[%s,%s,%s,%d] gsl_svr error,gettime=%lu,InvalidTime=%lu pls retry!", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pTmpService->ulGetTimes, pTmpService->ulInvalidTime); } else if ((int) pTmpService->cResult == 0) { if ((int) pTmpService->cRouteFlag == 2) { snprintf (pStPub->printGslBuf, sizeof (pStPub->printGslBuf) - 1, "service[%s,%s,%s,%d] routeFlag=%d,dcn=%s,time=%lu,InvalidTime=%lu", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, (int) pTmpService->cRouteFlag, pTmpService->strTargetDcn, pTmpService->ulGetTimes, pTmpService->ulInvalidTime); } else { snprintf (pStPub->printGslBuf, sizeof (pStPub->printGslBuf) - 1, "service[%s,%s,%s,%d] routeFlag=%d,time=%lu,InvalidTime=%lu", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, (int) pTmpService->cRouteFlag, pTmpService->ulGetTimes, pTmpService->ulInvalidTime); } } pStPub->printGslBuf[sizeof (pStPub->printGslBuf) - 1] = 0; return pStPub->printGslBuf; } /** * 将字符串中的'/'替换为'-' */ static void rmb_change_slash_to_hyphen (char *str, int iLen) { int i = 0; for (i = 0; i < iLen && str[i] != '\0'; i++) { if (str[i] == '/') { str[i] = '-'; } } } /** * 用于判断消息发往wemq */ static int rmb_pub_send_mode (StRmbPub * pPub, StRmbMsg * pMsg) { if (pPub == NULL || pMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "pPub or pMsg is null"); return -1; } pMsg->iMsgMode = RMB_MSG_WEMQ; return 0; } /** * private func, request gsl for route and dcn * 参见<GSL错误码细化.doc文档> * api cache需求: * 1. api使用同步的方式更新本地gsl cache,不做全量缓存 * 2. api请求gsl服务的超时时间默认为1000ms * 3. cache数据初始的超时时间为600s * 4. cache数据更新逻辑如下: * 1) cache中没有本次查询的数据, 则: * result字段为0,则查询结果存入缓存,失效时间为600s * result字段低4位为1,则查询结果存入缓存,失效时间为600s,向调用方返错 * result字段低4位为2,则查询结果不存入缓存,向调用方返错 * result字段低4位为3或者查询超时,查询结果不存入缓存,向调用方返错 * 2) api发送消息时发现cache中数据已失效,则去gsl拉取数据: * result字段为0,更新本地cache,同时将失效时间延后600s,使用最新查询到的数据发送rmb消息 * result字段低4位为1,则查询结果存入缓存,失效时间为600s,向调用方返错 * result字段低4位为2,则删除本地缓存数据,向调用方返错 * result字段低4位为3或者查询超时,如果cache原数据的result字段为0,更新本地数据的失效时间,延后30s */ static int rmb_pub_send_gsl (StRmbPub * pStPub, StServiceStatus * pTmpService, const char *cBizSeqNo, const char *cConsumerSeqNo) { //get dcn //char pkgBuf[MAX_GSL_REQ_BUF_SIZE]; char *p = pStPub->pkgBuf; int iPkgLen = 0; //*p = GSL_SUBCMD_QUERY_SERVICE; *p = GSL_SUBCMD_NEW_QUERY_SERVICE; p += 1; iPkgLen += 1; int tmp = strlen (pTmpService->strTargetOrgId); *p = tmp; p += 1; iPkgLen += 1; memcpy (p, pTmpService->strTargetOrgId, tmp); p += tmp; iPkgLen += tmp; tmp = strlen (pRmbStConfig->cConsumerDcn); *p = tmp; p += 1; iPkgLen += 1; memcpy (p, pRmbStConfig->cConsumerDcn, tmp); p += tmp; iPkgLen += tmp; tmp = strlen (pTmpService->strServiceId); *p = tmp; p += 1; iPkgLen += 1; memcpy (p, pTmpService->strServiceId, tmp); p += tmp; iPkgLen += tmp; tmp = strlen (pTmpService->strScenarioId); *p = tmp; p += 1; iPkgLen += 1; memcpy (p, pTmpService->strScenarioId, tmp); p += tmp; iPkgLen += tmp; *p = pTmpService->cFlagForOrgId; iPkgLen += 1; rmb_msg_set_bizSeqNo (pStPub->pSendMsg, cBizSeqNo); rmb_msg_set_consumerSeqNo (pStPub->pSendMsg, cConsumerSeqNo); rmb_msg_set_orgSysId (pStPub->pSendMsg, pRmbStConfig->cConsumerSysId); rmb_msg_set_dest_v2_1 (pStPub->pSendMsg, GSL_DEFAULT_DCN, GSL_DEFAULT_SERVICE_ID, GSL_DEFAULT_SCENE_ID, GSL_DEFAULT_COMMON_ORGID); rmb_msg_set_content (pStPub->pSendMsg, pStPub->pkgBuf, iPkgLen); char appHeader[5] = "{}"; rmb_msg_set_app_header (pStPub->pSendMsg, appHeader, strlen (appHeader)); int iRet = rmb_pub_send_and_receive (pStPub, pStPub->pSendMsg, pStPub->pRcvMsg, pRmbStConfig->iQueryTimeout); if (iRet == 0) { char receiveBuf[MAX_GSL_RSP_BUF_SIZE]; unsigned int receiveLen = sizeof (receiveBuf); rmb_msg_get_content (pStPub->pRcvMsg, receiveBuf, &receiveLen); if (receiveLen == 0) { LOGRMB (RMB_LOG_ERROR, "GSL reply len=0, req=%s\n", rmb_msg_print (pStPub->pSendMsg)); rmb_errno = RMB_ERROR_GSL_SVR_ERROR; return 3; } // result(char) + routeFlag(char) + targetDcn(cStr) char result = *(receiveBuf + 1); pTmpService->cResult = result; if (result == 0) { pTmpService->cRouteFlag = *(receiveBuf + 2); if (pTmpService->cRouteFlag == 0 || pTmpService->cRouteFlag == 1) { LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] routeFlag=%d\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, (int) pTmpService->cRouteFlag); return 0; } unsigned int uiDcnLen = *(receiveBuf + 3); memcpy (pTmpService->strTargetDcn, receiveBuf + 4, uiDcnLen); pTmpService->strTargetDcn[uiDcnLen] = 0; LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d],get succ!routeFlag=2,targetDcn=%s \n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pTmpService->strTargetDcn); return 0; } //兼容老的gsl的错误返回码 else if (result == 1) { LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] service=NULL,result=1, uniqueID=%s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId); //pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL; return 1; } else if (result == 2) { LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] service error,result=2, uniqueID=%s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId); //pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR; return 2; } else if (result == 3) { LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] gsl svr error,result=3, uniqueID=%s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId); //pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; rmb_errno = RMB_ERROR_GSL_SVR_ERROR; //return 3; return 4; } ////////////////////////////// else if ((result & 0x0F) == 0x01) { int index = (result & 0xF0) >> 4; LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] uId=%s, result=0x%02x, %s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId, gsl_error[index].result, gsl_error[index].err_msg); rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL; return 1; } else if ((result & 0x0F) == 0x02) { int index = ((result & 0xF0) >> 4) + 16; LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] uId=%s, result=0x%02x, %s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId, gsl_error[index].result, gsl_error[index].err_msg); rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR; return 2; } else if ((result & 0x0F) == 0x03) { int index = ((result & 0xF0) >> 4) + 32; LOGRMB (RMB_LOG_INFO, "GSL:[%s-%s-%s-%d] uId=%s, result=0x%02x, %s\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, pStPub->pRcvMsg->sysHeader.cUniqueId, gsl_error[index].result, gsl_error[index].err_msg); rmb_errno = RMB_ERROR_GSL_SVR_ERROR; return 4; } } else { if (rmb_errno == RMB_ERROR_SEND_RR_MSG_TIMEOUT) { LOGRMB (RMB_LOG_ERROR, "GSL:[%s-%s-%s-%d],send and receive from GSL timeout.\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId); pTmpService->cResult = 3; return 4; } LOGRMB (RMB_LOG_ERROR, "GSL:[%s-%s-%s-%d],send and recevie from GSL error,iRet=%d!\n", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId, iRet); //pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; pTmpService->cResult = 3; rmb_errno = RMB_ERROR_REQ_GSL_ERROR; return 3; } return 0; } /** * 给gsl发染色消息 */ int rmb_pub_send_dyed_msg_to_gsl (StRmbPub * pStPub) { char cBizSeqNo[50] = ""; char cConsumerSeqNo[50] = ""; rmb_msg_random_uuid (cBizSeqNo, 32); rmb_msg_random_uuid (cConsumerSeqNo, 32); rmb_msg_set_bizSeqNo (pStPub->pSendMsg, cBizSeqNo); rmb_msg_set_consumerSeqNo (pStPub->pSendMsg, cConsumerSeqNo); rmb_msg_set_orgSysId (pStPub->pSendMsg, pRmbStConfig->cConsumerSysId); rmb_msg_set_dest_v2_1 (pStPub->pSendMsg, GSL_DEFAULT_DCN, GSL_DEFAULT_SERVICE_ID, GSL_DEFAULT_SCENE_ID, GSL_DEFAULT_COMMON_ORGID); rmb_msg_set_content (pStPub->pSendMsg, "", 0); char appHeader[5] = "{}"; rmb_msg_set_app_header (pStPub->pSendMsg, appHeader, strlen (appHeader)); rmb_msg_set_dyedMsg (pStPub->pSendMsg, "true"); int iRet = rmb_pub_send_and_receive (pStPub, pStPub->pSendMsg, pStPub->pRcvMsg, pRmbStConfig->iQueryTimeout); if (iRet == 1) { LOGRMB (RMB_LOG_DEBUG, "send dyed msg to GSL success,ret code is %d", iRet); } rmb_msg_clear (pStPub->pSendMsg); return iRet; } //0:succ //1:GSL服务id为空 //2:GSL服务id路由错误 //3:GSL服务器错误 //-1:服务错误 int rmb_pub_send_gsl_and_insert_cache (StRmbPub * pStPub, StRmbMsg * pStMsg, StServiceStatus * pTmpService, StServiceStatus * hasCachedService) { //pthread_mutex_lock(&pStPub->pubMutex); if (hasCachedService != NULL) { //if (hasCachedService->ulGetTimes + pRmbStConfig->iCacheTimeoutTime * 1000 >= pRmbStConfig->ulNowTtime) if (hasCachedService->ulInvalidTime >= pRmbStConfig->ulNowTtime) { //if (hasCachedService->cResult == 1) if ((hasCachedService->cResult & 0x0F) == 0x01) { LOGRMB (RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service=NULL", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId); rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL; return 1; } // else if (hasCachedService->cResult == 2) // { // LOGRMB(RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service error", // pTmpService->strServiceId, // pTmpService->strScenarioId, // pTmpService->strTargetOrgId, // (int)pTmpService->cFlagForOrgId // ); // rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR; // return 2; // } else if (hasCachedService->cResult == 0) { if (hasCachedService->cRouteFlag == 0 || hasCachedService->cRouteFlag == 1) { return 0; } else { //LOGRMB(RMB_LOG_DEBUG, "nowTime=%lu,Cache:[%s]", pRmbStConfig->ulNowTtime, rmb_printf_service_status(pStPub, tmpService)); if (strcmp (pStMsg->strTargetDcn, hasCachedService->strTargetDcn) != 0) { LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", hasCachedService->strTargetDcn, pStMsg->strTargetDcn); strncpy (pStMsg->strTargetDcn, hasCachedService->strTargetDcn, sizeof (pStMsg->strTargetDcn) - 1); //LOGRMB(RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", hasCachedService->strTargetDcn, pStMsg->strTargetDcn); } return 0; } } } } else { hasCachedService = bsearch (pTmpService, pRmbStConfig->serviceStatusList, pRmbStConfig->iCacheServiceNums, sizeof (StServiceStatus), cmpServiceStatusStr); if (hasCachedService != NULL) { //if (hasCachedService->ulGetTimes + pRmbStConfig->iCacheTimeoutTime * 1000 >= pRmbStConfig->ulNowTtime) if (hasCachedService->ulInvalidTime >= pRmbStConfig->ulNowTtime) { //if (hasCachedService->cResult == 1) if ((hasCachedService->cResult & 0x0F) == 0x01) { LOGRMB (RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service=NULL", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId); rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL; return 1; } // else if (hasCachedService->cResult == 2) // { // LOGRMB(RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service error", // pTmpService->strServiceId, // pTmpService->strScenarioId, // pTmpService->strTargetOrgId, // (int)pTmpService->cFlagForOrgId // ); // rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR; // return 2; // } else if (hasCachedService->cResult == 0) { if (hasCachedService->cRouteFlag == 0 || hasCachedService->cRouteFlag == 1) { return 0; } else { //LOGRMB(RMB_LOG_DEBUG, "nowTime=%lu,Cache:[%s]", pRmbStConfig->ulNowTtime, rmb_printf_service_status(pStPub, tmpService)); if (strcmp (pStMsg->strTargetDcn, hasCachedService->strTargetDcn) != 0) { LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", hasCachedService->strTargetDcn, pStMsg->strTargetDcn); strncpy (pStMsg->strTargetDcn, hasCachedService->strTargetDcn, sizeof (pStMsg->strTargetDcn) - 1); } return 0; } } } } } int iRet = rmb_pub_send_gsl (pStPub, pTmpService, pStMsg->sysHeader.cBizSeqNo, pStMsg->sysHeader.cConsumerSeqNo); if (iRet == 2 || iRet == 3) { pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; if (hasCachedService != NULL) { hasCachedService->cResult = pTmpService->cResult; hasCachedService->cRouteFlag = pTmpService->cRouteFlag; } return iRet; } // if (iRet == 3) // { // pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; // pTmpService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000; // //pthread_mutex_unlock(&pStPub->pubMutex); // return 3; // } if (iRet == 4) { pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; pTmpService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000; if (hasCachedService != NULL) { if ((hasCachedService->cResult == 0) || ((hasCachedService->cResult & 0x0F) == 0x01)) { hasCachedService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000; } } return 4; } //cache过期,且在本地存在 if (hasCachedService != NULL) { pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; hasCachedService->cResult = pTmpService->cResult; hasCachedService->cRouteFlag = pTmpService->cRouteFlag; hasCachedService->ulGetTimes = pRmbStConfig->ulNowTtime; if (hasCachedService->cResult == 0) { memcpy (hasCachedService->strTargetDcn, pTmpService->strTargetDcn, sizeof (hasCachedService->strTargetDcn)); } if (iRet == 0 || iRet == 1) { hasCachedService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheSuccTimeoutTime * 1000; } else { hasCachedService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000; } } else { //本地无cach //本地无cache,且返回结果为2或3,则不缓存 if (iRet == 2 || iRet == 3 || iRet == 4) return iRet; pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime; if (iRet == 0 || iRet == 1) { pTmpService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheSuccTimeoutTime * 1000; } else { pTmpService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000; } //select StServiceStatus *pTmp = NULL; if (pRmbStConfig->iCacheServiceNums >= MAX_SERVICE_STATUS_CACHE_NUMS) { //find the oldest cache int i = 0; //unsigned long ulMinTimestamp = 1 << (sizeof(unsigned long)-1); unsigned long ulMinTimestamp = pRmbStConfig->serviceStatusList[0].ulGetTimes; for (; i < MAX_SERVICE_STATUS_CACHE_NUMS; i++) { if (ulMinTimestamp > pRmbStConfig->serviceStatusList[i].ulGetTimes) { ulMinTimestamp = pRmbStConfig->serviceStatusList[i].ulGetTimes; pTmp = &pRmbStConfig->serviceStatusList[i]; } } pRmbStConfig->iCacheServiceNums -= 1; } else { pTmp = &pRmbStConfig->serviceStatusList[pRmbStConfig->iCacheServiceNums]; } memcpy (pTmp, pTmpService, sizeof (StServiceStatus)); pRmbStConfig->iCacheServiceNums += 1; //sort qsort (pRmbStConfig->serviceStatusList, pRmbStConfig->iCacheServiceNums, sizeof (StServiceStatus), cmpServiceStatusStr); } int i = 0; for (; i < pRmbStConfig->iCacheServiceNums; i++) { LOGRMB (RMB_LOG_INFO, "cache%dth:[%s]", i, rmb_printf_service_status (pStPub, &pRmbStConfig->serviceStatusList[i])); } //pthread_mutex_unlock(&pStPub->pubMutex); //return pTmpService->cResult; return iRet; } /** * before 0.9.15 version * req: subcmd(char) + targetOrgId(cStr) + selfDcn(cStr) + serverId(cStr) + scenseId(cStr) + flag(char)(0: user 1: api) * rsp: subcmd(char) + result(char) + routeFlag(char) + targetDcn(cStr) * * 0.9.15: * req: subcmd -- 0x11 * rsp: result -- 0x11 0x21 ... * result 0x1x -- cache 600s * result 0x2x -- clean cache * return: * 0:success * 1:null * 2:serverId error * 3:GSL server error */ int rmb_pub_get_target_dcn (StRmbPub * pStPub, StRmbMsg * pStMsg) { //search cache //int iFLagForReqGsl = 1; StServiceStatus tmpStatus; tmpStatus.ulInvalidTime = 0; StServiceStatus *pTmpService = &tmpStatus; pTmpService->cFlagForOrgId = pStMsg->iFLagForOrgId; if (pStMsg->iFLagForOrgId == RMB_COMMIT_BY_OWN) { strncpy (pTmpService->strTargetOrgId, pStMsg->strTargetOrgId, sizeof (pTmpService->strTargetOrgId) - 1); } else { strncpy (pTmpService->strTargetOrgId, pRmbStConfig->strOrgId, sizeof (pTmpService->strTargetOrgId) - 1); } strncpy (pTmpService->strServiceId, pStMsg->strServiceId, sizeof (pTmpService->strServiceId) - 1); strncpy (pTmpService->strScenarioId, pStMsg->strScenarioId, sizeof (pTmpService->strScenarioId) - 1); StServiceStatus *tmpService = bsearch (pTmpService, pRmbStConfig->serviceStatusList, pRmbStConfig->iCacheServiceNums, sizeof (StServiceStatus), cmpServiceStatusStr); if (tmpService != NULL) { //if (tmpService->ulGetTimes + pRmbStConfig->iCacheTimeoutTime * 1000 >= pRmbStConfig->ulNowTtime) if (tmpService->ulInvalidTime >= pRmbStConfig->ulNowTtime) { //if (tmpService->cResult == 1) // if (tmpService->cResult == 0x11 || tmpService->cResult == 0x21 || tmpService->cResult == 0x31 || // tmpService->cResult == 0x41 || tmpService->cResult == 0x51 || tmpService->cResult == 0x61) if ((tmpService->cResult & 0x0F) == 0x01) { LOGRMB (RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service=NULL", pTmpService->strServiceId, pTmpService->strScenarioId, pTmpService->strTargetOrgId, (int) pTmpService->cFlagForOrgId); rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL; return 1; } // else if (tmpService->cResult == 2) // { // LOGRMB(RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service error", // pTmpService->strServiceId, // pTmpService->strScenarioId, // pTmpService->strTargetOrgId, // (int)pTmpService->cFlagForOrgId // ); // rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR; // return 2; // } else if (tmpService->cResult == 0) { if (tmpService->cRouteFlag == 0 || tmpService->cRouteFlag == 1) { return 0; } else { //LOGRMB(RMB_LOG_DEBUG, "nowTime=%lu,Cache:[%s]", pRmbStConfig->ulNowTtime, rmb_printf_service_status(pStPub, tmpService)); if (strcmp (pStMsg->strTargetDcn, tmpService->strTargetDcn) != 0) { LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", tmpService->strTargetDcn, pStMsg->strTargetDcn); strncpy (pStMsg->strTargetDcn, tmpService->strTargetDcn, sizeof (pStMsg->strTargetDcn) - 1); //LOGRMB(RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", tmpService->strTargetDcn, pStMsg->strTargetDcn); } return 0; } } } else { LOGRMB (RMB_LOG_INFO, "cache time out!nowTime=%lu, Cache:[%s] ", pRmbStConfig->ulNowTtime, rmb_printf_service_status (pStPub, tmpService)); } } pthread_mutex_lock (&pStPub->pubMutex); int iRet = rmb_pub_send_gsl_and_insert_cache (pStPub, pStMsg, pTmpService, tmpService); pthread_mutex_unlock (&pStPub->pubMutex); if (iRet != 0) { // if (iRet == 3 && tmpService != NULL) if (iRet == 4 && tmpService != NULL && tmpService->cResult == 0) { //copy dcn to msg if (tmpService->cRouteFlag == 2) { if (strcmp (pStMsg->strTargetDcn, tmpService->strTargetDcn) != 0) { LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", tmpService->strTargetDcn, pStMsg->strTargetDcn); strncpy (pStMsg->strTargetDcn, tmpService->strTargetDcn, sizeof (pStMsg->strTargetDcn) - 1); //LOGRMB(RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", tmpService->strTargetDcn, pStMsg->strTargetDcn); } } LOGRMB (RMB_LOG_ERROR, "ReqGsl error,but use cache! gsl=[%s],cache=%s", rmb_printf_service_status (pStPub, pTmpService), rmb_printf_service_status (pStPub, tmpService)); return 0; } LOGRMB (RMB_LOG_ERROR, "ReqGsl gsl=[%s]", rmb_printf_service_status (pStPub, pTmpService)); return iRet; } else { //copy dcn to msg // if (pTmpService->cResult == 0 && pTmpService->cRouteFlag == 2) { if (strcmp (pStMsg->strTargetDcn, pTmpService->strTargetDcn) != 0) { LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", pTmpService->strTargetDcn, pStMsg->strTargetDcn); strncpy (pStMsg->strTargetDcn, pTmpService->strTargetDcn, sizeof (pStMsg->strTargetDcn) - 1); //LOGRMB(RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", pTmpService->strTargetDcn, pStMsg->strTargetDcn); } } LOGRMB (RMB_LOG_INFO, "ReqGsl succ! gsl=[%s]", rmb_printf_service_status (pStPub, pTmpService)); } return 0; } int rmb_pub_set_destination_Interval (StRmbPub * pStPub, StRmbMsg * pStMsg) { //req gsl control if (pRmbStConfig->iReqGsl == 1) { if (!strcmp (pStMsg->strServiceId, GSL_DEFAULT_SERVICE_ID)) { strncpy (pStMsg->strTargetDcn, GSL_DEFAULT_DCN, sizeof (pStMsg->strTargetDcn) - 1); } else { int iRet = rmb_pub_get_target_dcn (pStPub, pStMsg); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_get_target_dcn error!iRet=%d", iRet); return iRet; } } } if (pStMsg->iEventOrService == RMB_EVENT_CALL) { snprintf (pStMsg->dest.cDestName, sizeof (pStMsg->dest.cDestName), "%s/e/%s/%s/%c", pStMsg->strTargetDcn, pStMsg->strServiceId, pStMsg->strScenarioId, *(pStMsg->strServiceId + 3)); } else { snprintf (pStMsg->dest.cDestName, sizeof (pStMsg->dest.cDestName), "%s/s/%s/%s/%c", pStMsg->strTargetDcn, pStMsg->strServiceId, pStMsg->strScenarioId, *(pStMsg->strServiceId + 3)); } pStMsg->dest.iDestType = RMB_DEST_TOPIC; //set pub message to wemq rmb_pub_send_mode (pStPub, pStMsg); return 0; } /** * Function: rmb_pub_init * Description: rmb pub initialize * Return: * 0: success * -1: failed */ int rmb_pub_init (StRmbPub * pRmbPub) { if (pRmbPub == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub arg is null\n"); rmb_errno = RMB_ERROR_ARGV_NULL; return -1; } if (pRmbPub->uiContextNum == 1) { LOGRMB (RMB_LOG_ERROR, "pRmbPub has already init!"); return 0; } pRmbPub->pContext = (StContext *) malloc (sizeof (StContext)); if (pRmbPub->pContext == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub->pContext malloc error!"); rmb_errno = RMB_ERROR_MALLOC_FAIL; return -2; } pRmbStConfig->uiPid = (unsigned int) getpid (); memset (pRmbPub->pContext, 0, sizeof (StContext)); pRmbPub->pContext->contextType = RMB_CONTEXT_TYPE_PUB; int iRet = rmb_context_init (pRmbPub->pContext); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_init failed!iRet=%d,error=%s", iRet, get_rmb_last_error ()); rmb_errno = RMB_ERROR_INIT_CONTEXT_FAIL; return -3; } pRmbPub->uiContextNum = 1; pRmbPub->pContext->pFather = (void *) pRmbPub; pRmbPub->ulLastTime = 0; //rmb_pub_rand(pRmbPub); pRmbPub->pSendMsg = rmb_msg_malloc (); if (pRmbPub->pSendMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_init malloc failed!"); rmb_errno = RMB_ERROR_MALLOC_FAIL; return -2; } //pRmbPub->pRcvMsg = (StRmbMsg*)malloc(sizeof(StRmbMsg)); pRmbPub->pRcvMsg = rmb_msg_malloc (); if (pRmbPub->pRcvMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_init malloc failed!"); rmb_errno = RMB_ERROR_MALLOC_FAIL; return -2; } pthread_mutex_init (&pRmbPub->pubMutex, NULL); rmb_pub_send_dyed_msg_to_gsl (pRmbPub); return 0; } /** * Function: rmb_pub_init_python * Description: rmb pub initialize * Return: * 0: success * -1: failed */ int rmb_pub_init_python () { pRmbGlobalPub = (StRmbPub *) calloc (1, sizeof (StRmbPub)); if (pRmbGlobalPub == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbGlobalPub arg is null\n"); rmb_errno = RMB_ERROR_ARGV_NULL; return -1; } rmb_pub_init (pRmbGlobalPub); return 0; } int rmb_pub_send_msg_to_wemq (StRmbPub * pRmbPub, StRmbMsg * pMsg) { if (pRmbPub == NULL || pMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub or pMsg is null"); return -1; } //check dest is set if (rmb_check_msg_valid (pMsg) != 0) { rmb_errno = RMB_ERROR_MSG_MISSING_PART; return -2; } if (pMsg->iEventOrService == (int) RMB_SERVICE_CALL) { LOGRMB (RMB_LOG_ERROR, "rmb pub event interface can't send rr msg,serviceId=%s!\n", pMsg->strServiceId); rmb_errno = RMB_ERROR_EVENT_INTERFACE_CAN_NOT_SEND_RR_MSG; return -3; } int iRet = 0; iRet = rmb_msg_init (pMsg, pRmbStConfig, C_TYPE_WEMQ); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d\n", iRet); return -5; } if (pMsg->ulMsgLiveTime == 0 || pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME) { pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME; } pMsg->cLogicType = EVENT_PKG_IN_WEMQ; GetRmbNowLongTime (); pMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime; StContext *pStContext = pRmbPub->pContext; pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG; if (pMsg->ulMsgLiveTime == 0 || pMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME) { pMsg->ulMsgLiveTime = DEFAULT_MSG_MAX_LIVE_TIME; } stContextProxy *pContextProxy = pStContext->pContextProxy; StWemqThreadMsg stThreadMsg; memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg)); stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_MSG; iRet = rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pMsg, pMsg->ulMsgLiveTime); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!"); rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL, "wemq_pub_encode_thread_msg error", pMsg); return iRet; } pthread_mutex_lock (&pContextProxy->eventMutex); iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg); if (iRet <= 0) { LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!,iRet=%d", iRet); rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_WORKER_PUT_FIFO_ERROR, "wemq_kfifo_put error", pMsg); return -5; } struct timeval tv; gettimeofday (&tv, NULL); struct timespec ts_timeout; ts_timeout.tv_sec = tv.tv_sec + (tv.tv_usec / 1000 + pRmbStConfig->accessAckTimeOut) / 1000; ts_timeout.tv_nsec = ((tv.tv_usec / 1000 + pRmbStConfig->accessAckTimeOut) % 1000) * 1000 * 1000; pContextProxy->iFlagForEvent = -1; if (pContextProxy->iFlagForEvent == -1) { //reset seq pContextProxy->iSeqForEvent = g_iSendReqForEvent; LOGRMB (RMB_LOG_DEBUG, "reset seq:%ld, pContextProxy->iSeqForEvent:%ld", g_iSendReqForEvent, pContextProxy->iSeqForEvent); pthread_cond_timedwait (&pContextProxy->eventCond, &pContextProxy->eventMutex, &ts_timeout); } pthread_mutex_unlock (&pContextProxy->eventMutex); switch (pContextProxy->iFlagForEvent) { case RMB_CODE_TIME_OUT: LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pMsg)); rmb_errno = RMB_ERROR_SEND_EVENT_MSG_FAIL; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_SEND_EVENT_MSG_FAIL, "wemq send event msg ack timeout", pMsg); return -6; case RMB_CODE_SUSS: LOGRMB (RMB_LOG_DEBUG, "send msg succ!req=%s\n", rmb_msg_print (pMsg)); return 0; case RMB_CODE_OTHER_FAIL: LOGRMB (RMB_LOG_ERROR, "send msg failed!req=%s", rmb_msg_print (pMsg)); return -6; case RMB_CODE_AUT_FAIL: LOGRMB (RMB_LOG_ERROR, "Authentication failed!req=%s", rmb_msg_print (pMsg)); return -5; } } static int rmb_pub_send_and_receive_to_wemq (StRmbPub * pRmbPub, StRmbMsg * pSendMsg, StRmbMsg * pRevMsg, unsigned int uiTimeOut) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg"); RMB_CHECK_POINT_NULL (pRevMsg, "pRevMsg"); if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL) { LOGRMB (RMB_LOG_ERROR, "rr interface can't send event msg!"); rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG; return -1; } int iRet = 0; iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet); return -3; } if (pSendMsg->ulMsgLiveTime == 0 || pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME) { pSendMsg->ulMsgLiveTime = uiTimeOut; } pSendMsg->cLogicType = REQ_PKG_IN_WEMQ; GetRmbNowLongTime (); pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime; StContext *pStContext = pRmbPub->pContext; pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG; stContextProxy *pContextProxy = pStContext->pContextProxy; StWemqThreadMsg stThreadMsg; memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg)); stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST; iRet = rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg, uiTimeOut); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!"); rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL, "wemq_pub_encode_thread_msg error", pSendMsg); return -4; } pthread_mutex_lock (&pContextProxy->rrMutex); iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg); if (iRet <= 0) { LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet); rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_WORKER_PUT_FIFO_ERROR, "wemq_kfifo_put error", pSendMsg); return -5; } struct timeval tv; gettimeofday (&tv, NULL); struct timespec ts_timeout; ts_timeout.tv_sec = tv.tv_sec + (tv.tv_usec / 1000 + uiTimeOut) / 1000; ts_timeout.tv_nsec = ((tv.tv_usec / 1000 + uiTimeOut) % 1000) * 1000 * 1000; int i = 0; unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId); pContextProxy->iFlagForRR = -1; if (pContextProxy->iFlagForRR == -1) { //add uniqueId strncpy (pContextProxy->stUnique.unique_id, pSendMsg->sysHeader.cUniqueId, uiUniqueLen); pContextProxy->stUnique.unique_id[uiUniqueLen] = '\0'; pContextProxy->stUnique.flag = 1; } pthread_cond_timedwait (&pContextProxy->rrCond, &pContextProxy->rrMutex, &ts_timeout); if (pContextProxy->iFlagForRR == RMB_CODE_TIME_OUT) { pContextProxy->stUnique.flag = 0; } pthread_mutex_unlock (&pContextProxy->rrMutex); switch (pContextProxy->iFlagForRR) { case RMB_CODE_TIME_OUT: LOGRMB (RMB_LOG_ERROR, "time out!req=%s", rmb_msg_print (pSendMsg)); rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_SEND_RR_MSG_TIMEOUT, "wemq send rr msg timeout", pSendMsg); return -6; case RMB_CODE_SUSS: trans_json_2_rmb_msg (pRevMsg, pContextProxy->mPubRRBuf, RESPONSE_TO_CLIENT); LOGRMB (RMB_LOG_DEBUG, "receive reply succ,buf:%s", pContextProxy->mPubRRBuf); pRevMsg->cPkgType = RR_TOPIC_PKG; return 0; case RMB_CODE_OTHER_FAIL: LOGRMB (RMB_LOG_ERROR, "receive reply failed!req=%s", rmb_msg_print (pSendMsg)); rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT; return -4; case RMB_CODE_AUT_FAIL: LOGRMB (RMB_LOG_ERROR, "receive reply Authentication failed!req=%s", rmb_msg_print (pSendMsg)); rmb_errno = RMB_ERROR_SEND_RR_MSG_TIMEOUT; return -5; case RMB_CODE_DYED_MSG: LOGRMB (RMB_LOG_INFO, "receive dyed msg:%s", pContextProxy->mPubRRBuf); return 1; } } int rmb_pub_send_rr_msg_async_to_wemq (StRmbPub * pRmbPub, StRmbMsg * pSendMsg, unsigned int uiTimeOut) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg"); if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL) { LOGRMB (RMB_LOG_ERROR, "async rr interface can't send event msg!"); rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG; return -1; } //pub connect status error //if (pRmbPub->pContext->pContextProxy->iFlagForPublish == 0) { // LOGRMB(RMB_LOG_ERROR, "rmb pub not connect to access!!!"); // return -4; //} int iRet = 0; // iRet = rmb_pub_set_destination_Interval(pRmbPub, pSendMsg); // if (iRet != 0) { // LOGRMB(RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d", pSendMsg->strServiceId, pSendMsg->strScenarioId, iRet); // return -2; // } LOGRMB (RMB_LOG_DEBUG, "pubMsg dest=%d,%s,replyTo=%s", pSendMsg->dest.iDestType, pSendMsg->dest.cDestName, pSendMsg->replyTo.cDestName); iRet = rmb_msg_init (pSendMsg, pRmbStConfig, C_TYPE_WEMQ); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_msg_init failed!iRet=%d", iRet); return -3; } if (uiTimeOut > RR_ASYNC_MSG_MAX_LIVE_TIME) { LOGRMB (RMB_LOG_ERROR, "RR sync ttl too large, max value is:%ld", RR_ASYNC_MSG_MAX_LIVE_TIME); return -4; } if (pSendMsg->ulMsgLiveTime == 0 || pSendMsg->ulMsgLiveTime > DEFAULT_MSG_MAX_LIVE_TIME) { pSendMsg->ulMsgLiveTime = uiTimeOut; } GetRmbNowLongTime (); pSendMsg->sysHeader.ulSendTime = pRmbStConfig->ulNowTtime; pSendMsg->replyTo.iDestType = RMB_DEST_TOPIC; StContext *pStContext = pRmbPub->pContext; pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG; stContextProxy *pContextProxy = pStContext->pContextProxy; StWemqThreadMsg stThreadMsg; memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg)); stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REQUEST_ASYNC; iRet = rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pSendMsg, 0); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "wemq_pub_encode_thread_msg error!"); rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL, "wemq_pub_encode_thread_msg error", pSendMsg); return iRet; } unsigned int uiUniqueLen = strlen (pSendMsg->sysHeader.cUniqueId); int i = 0; struct timeval tv_now; gettimeofday (&tv_now, NULL); unsigned long ulNowTime = tv_now.tv_sec * 1000 + tv_now.tv_usec / 1000; int iFlagForList = 0; for (i = 0; i < pContextProxy->pUniqueListForRRAsyncNew. get_array_size (&pContextProxy->pUniqueListForRRAsyncNew); i++) { if (pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag == 0) { pthread_mutex_lock (&pContextProxy->rrMutex); snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].unique_id, sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i]. unique_id), "%s", pSendMsg->sysHeader.cUniqueId); snprintf (pContextProxy->pUniqueListForRRAsyncNew.Data[i].biz_seq, sizeof (pContextProxy->pUniqueListForRRAsyncNew.Data[i]. biz_seq), "%s", pSendMsg->sysHeader.cBizSeqNo); pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag = 1; pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeStamp = ulNowTime; pContextProxy->pUniqueListForRRAsyncNew.Data[i].timeout = uiTimeOut; iFlagForList = 1; pthread_mutex_unlock (&pContextProxy->rrMutex); break; } } //已有空间已装满 if (iFlagForList == 0) { LOGRMB (RMB_LOG_INFO, "local list for rr async push back"); StUniqueIdList uniqueIdList; strncpy (uniqueIdList.unique_id, pSendMsg->sysHeader.cUniqueId, uiUniqueLen); uniqueIdList.unique_id[uiUniqueLen] = '\0'; uniqueIdList.flag = 1; uniqueIdList.timeStamp = ulNowTime; uniqueIdList.timeout = uiTimeOut; pthread_mutex_lock (&pContextProxy->rrMutex); pContextProxy->pUniqueListForRRAsyncNew.Input (uniqueIdList, &pContextProxy-> pUniqueListForRRAsyncNew); pthread_mutex_unlock (&pContextProxy->rrMutex); } iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg); if (iRet <= 0) { LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet); rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_WORKER_PUT_FIFO_ERROR, "wemq_kfifo_put error", pSendMsg); return -4; } pContextProxy->iFlagForRRAsync = 1; return 0; } /** Function: wemq_pub_reply_msg Description:send report packet Retrun: 0 --success -1 --failed */ int rmb_pub_reply_msg_for_wemq (StRmbPub * pRmbPub, StRmbMsg * pStReceiveMsg, StRmbMsg * pStReplyMsg) { if (pRmbPub == NULL || pStReceiveMsg == NULL || pStReplyMsg == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub or pStReceiveMsg or pStReplyMsg is null"); return -1; } if (strlen (pStReceiveMsg->replyTo.cDestName) == 0) { LOGRMB (RMB_LOG_ERROR, "receiveMsg has no replyTo,can't reply!\n"); return -2; } //pub connect status error //if (pRmbPub->pContext->pContextProxy->iFlagForPublish == 0) { // LOGRMB(RMB_LOG_ERROR, "rmb pub not connect to access!!!"); // return -4; //} memcpy (&pStReplyMsg->sysHeader, &pStReceiveMsg->sysHeader, sizeof (pStReceiveMsg->sysHeader)); memcpy (&pStReplyMsg->dest, &pStReceiveMsg->replyTo, sizeof (pStReceiveMsg->replyTo)); //serviceId memcpy (pStReplyMsg->strServiceId, pStReceiveMsg->strServiceId, sizeof (pStReceiveMsg->strServiceId)); //scenarioId memcpy (pStReplyMsg->strScenarioId, pStReceiveMsg->strScenarioId, sizeof (pStReceiveMsg->strScenarioId)); //dcn memcpy (pStReplyMsg->strTargetDcn, pStReceiveMsg->strTargetDcn, sizeof (pStReceiveMsg->strTargetDcn)); //organization memcpy (pStReplyMsg->strTargetOrgId, pStReceiveMsg->strTargetOrgId, sizeof (pStReceiveMsg->strTargetOrgId)); //ttl pStReplyMsg->ulMsgLiveTime = pStReceiveMsg->ulMsgLiveTime; strncpy (pStReplyMsg->cCorrId, pStReceiveMsg->cCorrId, sizeof (pStReceiveMsg->cCorrId)); pStReplyMsg->iCorrLen = pStReceiveMsg->iCorrLen; pStReplyMsg->cApiType = C_TYPE_WEMQ; pStReplyMsg->cLogicType = RSP_PKG_OUT_WEMQ; GetRmbNowLongTime (); pStReplyMsg->sysHeader.ulReplyTime = pRmbStConfig->ulNowTtime; /* while (CURRENT_WINDOW_SIZE >= DEFAULT_WINDOW_SIZE) { LOGRMB(RMB_LOG_ERROR, "Send Window Full, recvSeq=%u,sendSeq=%u", g_uiRecvMsgSeq, g_uiSendMsgSeq); usleep(1000); } */ StContext *pStContext = pRmbPub->pContext; pStContext->uiPkgLen = MAX_LENTH_IN_A_MSG; stContextProxy *pContextProxy = pStContext->pContextProxy; StWemqThreadMsg stThreadMsg; memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg)); stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_REPLY; int iRet = 0; iRet = rmb_pub_encode_thread_msg (stThreadMsg.m_iCmd, &stThreadMsg, pStReplyMsg, 0); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb_pub_encode_thread_msg error!"); rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_ENCODE_FAIL, "wemq_reply_encode_thread_msg error", pStReplyMsg); return iRet; } iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg); if (iRet <= 0) { LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error!iRet=%d", iRet); rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR; rmb_send_log_for_error (pStContext->pContextProxy, RMB_ERROR_WORKER_PUT_FIFO_ERROR, "reply message wemq_kfifo_put error", pStReplyMsg); return rmb_errno; } return 0; } /** * Function: rmb_pub_send_msg * Description: send event msg * Return: * 0: success * -1: failed * -2: queue full */ int rmb_pub_send_msg (StRmbPub * pRmbPub, StRmbMsg * pStMsg) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pStMsg, "pStMsg"); if (rmb_check_msg_valid (pStMsg) != 0) { rmb_errno = RMB_ERROR_MSG_MISSING_PART; return rmb_errno; } if (pStMsg->iEventOrService == (int) RMB_SERVICE_CALL) { LOGRMB (RMB_LOG_ERROR, "event interface can't send rr msg,serviceId=%s!", pStMsg->strServiceId); rmb_errno = RMB_ERROR_EVENT_INTERFACE_CAN_NOT_SEND_RR_MSG; return rmb_errno; } int iRet = rmb_pub_set_destination_Interval (pRmbPub, pStMsg); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d,error=%s", pStMsg->strServiceId, pStMsg->strScenarioId, iRet, get_rmb_last_error ()); return rmb_errno; } return rmb_pub_send_msg_to_wemq (pRmbPub, pStMsg); } /** * Function: rmb_pub_send_msg_python * Description: send event msg_python * Return: * 0: success * -1: failed * -2: queue full */ int rmb_pub_send_msg_python (StRmbMsg * pStMsg) { StRmbPub *pRmbPub = pRmbGlobalPub; return rmb_pub_send_msg (pRmbPub, pStMsg); } /** Function: rmb_pub_send_rr_msg Description:send RR asynchronous message Retrun: 0 --success -1 --failed -2 --queue full */ int rmb_pub_send_rr_msg_async (StRmbPub * pRmbPub, StRmbMsg * pStMsg, unsigned int uiTimeOut) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pStMsg, "pStMsg"); if (pStMsg->iEventOrService == (int) RMB_EVENT_CALL) { LOGRMB (RMB_LOG_ERROR, "aync RR interface can't send event msg!"); rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG; return rmb_errno; } //set destination int iRet = rmb_pub_set_destination_Interval (pRmbPub, pStMsg); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d", pStMsg->strServiceId, pStMsg->strScenarioId, iRet); return rmb_errno; } return rmb_pub_send_rr_msg_async_to_wemq (pRmbPub, pStMsg, uiTimeOut); } /** Function: rmb_pub_send_rr_msg_async_python Description:send RR asynchronous message Retrun: 0 --success -1 --failed -2 --queue full */ int rmb_pub_send_rr_msg_async_python (StRmbMsg * pStMsg, unsigned int uiTimeOut) { StRmbPub *pRmbPub = pRmbGlobalPub; return rmb_pub_send_rr_msg_async (pRmbPub, pStMsg, uiTimeOut); } /** Function: rmb_pub_send_and_receive Description:send message and wait for report Retrun: 0 --success -1 --timeout -2 --error */ int rmb_pub_send_and_receive (StRmbPub * pRmbPub, StRmbMsg * pSendMsg, StRmbMsg * pRevMsg, unsigned int uiTimeOut) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pSendMsg, "pSendMsg"); RMB_CHECK_POINT_NULL (pRevMsg, "pRevMsg"); if (pSendMsg->iEventOrService == (int) RMB_EVENT_CALL) { LOGRMB (RMB_LOG_ERROR, "RR interface can't send event msg!"); rmb_errno = RMB_ERROR_RR_INTERFACE_CAN_NOT_SEND_EVENT_MSG; return rmb_errno; } //set destination int iRet = rmb_pub_set_destination_Interval (pRmbPub, pSendMsg); if (iRet != 0) { LOGRMB (RMB_LOG_ERROR, "rmb set destination error!serviceId=%s,sceneId=%s,iRet=%d", pSendMsg->strServiceId, pSendMsg->strScenarioId, iRet); return rmb_errno; } return rmb_pub_send_and_receive_to_wemq (pRmbPub, pSendMsg, pRevMsg, uiTimeOut); } int rmb_pub_send_and_receive_python (StRmbMsg * pSendMsg, StRmbMsg * pRevMsg, unsigned int uiTimeOut) { StRmbPub *pRmbPub = pRmbGlobalPub; return rmb_pub_send_and_receive (pRmbPub, pSendMsg, pRevMsg, uiTimeOut); } /** Function: rmb_pub_reply_msg Description:send report packet Retrun: 0 --success -1 --failed */ int rmb_pub_reply_msg (StRmbPub * pRmbPub, StRmbMsg * pStReceiveMsg, StRmbMsg * pStReplyMsg) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); RMB_CHECK_POINT_NULL (pStReceiveMsg, "pStReceiveMsg"); RMB_CHECK_POINT_NULL (pStReplyMsg, "pStReplyMsg"); if (strlen (pStReceiveMsg->replyTo.cDestName) == 0) { //LOGRMB(RMB_LOG_INFO, "rmb receivemsg reply destname empty"); LOGRMB (RMB_LOG_WARN, "pStReceiveMsg->replyTo.cDestName=%s", pStReceiveMsg->replyTo.cDestName); return 0; } return rmb_pub_reply_msg_for_wemq (pRmbPub, pStReceiveMsg, pStReplyMsg); } /** Function: rmb_pub_close Description:close pub Retrun: 0 --success -1 --failed */ int rmb_pub_close (StRmbPub * pRmbPub) { if (pRmbPub == NULL || pRmbPub->pContext == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub or pRmbPub->pContext is null"); return 0; } //wemq if (pRmbStConfig->iConnWemq == 1 || pRmbStConfig->iApiLogserverSwitch == 1) { if (pRmbPub->pContext->pContextProxy != NULL) { stContextProxy *pContextProxy = pRmbPub->pContext->pContextProxy; //pContextProxy->iFlagForRun = 0; if (rmb_pub_send_client_goodbye_to_wemq (pRmbPub) != 0) { LOGRMB (RMB_LOG_DEBUG, "rmb_pub_send_client_goodbye_to_wemq failed"); } pContextProxy->iFlagForRun = 0; GetRmbNowLongTime (); pContextProxy->ulGoodByeTime = pRmbStConfig->ulNowTtime; LOGRMB (RMB_LOG_DEBUG, "pthread_join mainThreadId"); if (pContextProxy->mainThreadId != 0) { pthread_join (pContextProxy->mainThreadId, NULL); } LOGRMB (RMB_LOG_DEBUG, "pthread_join coThreadId"); if (pContextProxy->coThreadId != 0) { pthread_join (pContextProxy->coThreadId, NULL); } } } return 0; } int rmb_pub_close_python () { StRmbPub *pRmbPub = pRmbGlobalPub; int ret = rmb_pub_close_v2 (pRmbPub); free (pRmbPub); return ret; } /** Function: rmb_pub_close_v2 Description:close pub Retrun: 0 --success -1 --failed */ int rmb_pub_close_v2 (StRmbPub * pRmbPub) { if (pRmbPub == NULL || pRmbPub->pContext == NULL) { LOGRMB (RMB_LOG_ERROR, "pRmbPub or pRmbPub->pContext is null"); return 0; } //wemq if (pRmbStConfig->iConnWemq == 1 || pRmbStConfig->iApiLogserverSwitch == 1) { if (pRmbPub->pContext->pContextProxy != NULL) { stContextProxy *pContextProxy = pRmbPub->pContext->pContextProxy; pContextProxy->iFlagForRun = 0; GetRmbNowLongTime (); pContextProxy->ulGoodByeTime = pRmbStConfig->ulNowTtime; LOGRMB (RMB_LOG_DEBUG, "pthread_join mainThreadId"); if (pContextProxy->mainThreadId != 0) { pthread_join (pContextProxy->mainThreadId, NULL); } LOGRMB (RMB_LOG_DEBUG, "pthread_join coThreadId"); if (pContextProxy->coThreadId != 0) { pthread_join (pContextProxy->coThreadId, NULL); } } } return 0; } int rmb_pub_send_client_goodbye_to_wemq (StRmbPub * pRmbPub) { RMB_CHECK_POINT_NULL (pRmbPub, "pRmbPub"); stContextProxy *pContextProxy = pRmbPub->pContext->pContextProxy; StWemqThreadMsg stThreadMsg; memset (&stThreadMsg, 0x00, sizeof (StWemqThreadMsg)); stThreadMsg.m_iCmd = THREAD_MSG_CMD_SEND_CLIENT_GOODBYE; WEMQJSON *jsonHeader = json_object_new_object (); if (jsonHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "json_object_new_object failed"); return -2; } //add type json_object_object_add (jsonHeader, MSG_HEAD_COMMAND_STR, json_object_new_string (CLIENT_GOODBYE_REQUEST)); //add seq json_object_object_add (jsonHeader, MSG_HEAD_SEQ_INT, json_object_new_int (0)); //add status json_object_object_add (jsonHeader, MSG_HEAD_CODE_INT, json_object_new_int (0)); const char *header_str = json_object_get_string (jsonHeader); if (header_str == NULL) { LOGRMB (RMB_LOG_ERROR, "header is null"); json_object_put (jsonHeader); return -2; } stThreadMsg.m_iHeaderLen = strlen (header_str); LOGRMB (RMB_LOG_DEBUG, "Get thread msg header succ, len=%u, %s", stThreadMsg.m_iHeaderLen, header_str) stThreadMsg.m_pHeader = (char *) malloc ((stThreadMsg.m_iHeaderLen + 1) * sizeof (char)); if (stThreadMsg.m_pHeader == NULL) { LOGRMB (RMB_LOG_ERROR, "malloc for stThreadMsg.m_pHeader failed"); json_object_put (jsonHeader); return -2; } memcpy (stThreadMsg.m_pHeader, header_str, stThreadMsg.m_iHeaderLen); stThreadMsg.m_pHeader[stThreadMsg.m_iHeaderLen] = '\0'; json_object_put (jsonHeader); stThreadMsg.m_iBodyLen = 0; stThreadMsg.m_pBody = NULL; int iRet = wemq_kfifo_put (&pContextProxy->pubFifo, stThreadMsg); if (iRet <= 0) { LOGRMB (RMB_LOG_ERROR, "wemq_kfifo_put error,iRet=%d!\n", iRet); rmb_errno = RMB_ERROR_WORKER_PUT_FIFO_ERROR; return rmb_errno; } struct timeval nowTimeVal; gettimeofday (&nowTimeVal, NULL); struct timespec timeout; timeout.tv_sec = nowTimeVal.tv_sec + (nowTimeVal.tv_usec / 1000 + pRmbStConfig->goodByeTimeOut) / 1000; timeout.tv_nsec = ((nowTimeVal.tv_usec / 1000 + pRmbStConfig->goodByeTimeOut) % 1000) * 1000 * 1000; pContextProxy->iFlagForGoodBye = 0; pthread_mutex_lock (&pContextProxy->goodByeMutex); if (pContextProxy->iFlagForGoodBye == 0) { pthread_cond_timedwait (&pContextProxy->goodByeCond, &pContextProxy->goodByeMutex, &timeout); } pthread_mutex_unlock (&pContextProxy->goodByeMutex); if (pContextProxy->iFlagForGoodBye != 1) { LOGRMB (RMB_LOG_ERROR, "send pub client goodBye timeout!"); rmb_errno = RMB_ERROR_CLIENT_GOODBYE_TIMEOUT; return rmb_errno; } LOGRMB (RMB_LOG_DEBUG, "send and recv sub client goodbye succ"); return 0; }