eventmesh-sdks/eventmesh-sdk-c/include/rmb_define.h (833 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RMB_DEFINE_H_ #define RMB_DEFINE_H_ #ifdef __cplusplus extern "C" { #endif /* rmb_define.h RMB基本定义 */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> #include <unistd.h> #include <sys/epoll.h> #include <time.h> #include <sys/time.h> #include <netinet/in.h> #include <pthread.h> #include <unistd.h> //#include "rmb_mq.h" #include "rmb_log.h" #include "rmb_errno.h" #include "wemq_fifo.h" #include "wemq_topic_list.h" #include "wemq_proto.h" #include <openssl/crypto.h> #include <openssl/x509.h> #include <openssl/pem.h> #include <openssl/ssl.h> #include <openssl/err.h> #define RMBVERSION "2.2.0" #define RMBVERSIONFORBIZ "00002200" static const char rmbVersion[20] = "2.2.0"; #define DEFAULT_MSG_MAX_LIVE_TIME 14400000 #define RR_ASYNC_MSG_MAX_LIVE_TIME 60000 #define DEFAULT_DCN_LENGTH 3 #define DEFAULT_SERVICE_ID_LENGTH 8 #define DEFAULT_SCENE_ID_LENGTH 2 #define MAX_FLOWS_IN_A_SESSION 200 //the max flow numbers in one session #define MAX_QUEUE_SESSIONS_IN_A_CONTEXT 1 //the max session numbers in one context //#define MAX_LENTH_IN_A_MSG 2150000 //packages buf for send and receive #define MAX_LENTH_IN_A_MSG 2625825 //packages buf for send and receive #define MAX_PROPERTY_NUMS 50 //the max numbers for property #define MAX_PROPERTY_LENTH 255 //the max length for one property #define MAX_RMB_CONTEXT 2 //the max numbers of context -- solace threads #define MAX_SYSHEADER_LENGTH 20 //流控比中每个topic最大的计数 #define MAX_TOPIC_COUNT 1000000 //#define MAX_APPHEADER_LENGTH 50000 //0.9.12 保持和java一致 //0.9.16,由于c会加\0,所以字节数需要加1 //#define MAX_APPHEADER_LENGTH 525000 //#define MAX_APPHEADER_LENGTH 525001 #define MAX_APPHEADER_LENGTH (1 << 19) #define MAX_PROPERTY_SIZE 50 #define MAX_RET_MSG_SIZE 512 //0.9.16,由于c会加\0,所以字节数需要加1 //#define MAX_MSG_CONTENT_SIZE 2100000 //#define MAX_MSG_CONTENT_SIZE 2100001 #define MAX_MSG_CONTENT_SIZE (1 << 21) #define MAX_MSG_ATTACHMENT_SIZE 2048 #define MAX_COMMON_SIZE 20 #define MAX_LOG_BUF_SIZE 20000 #define MAX_SERVICE_STATUS_CACHE_NUMS 1000 #define MAX_GSL_REQ_BUF_SIZE 100 #define MAX_GSL_RSP_BUF_SIZE 100 #define MAX_GSL_RSP_TOPIC_GROUP_SIZE 256 #define GSL_DEFAULT_DCN "GSL" #define GSL_DEFAULT_SERVICE_ID "12300009" #define GSL_DEFAULT_SCENE_ID "01" #define GSL_DEFAULT_COMMON_ORGID "10006" #define MAX_RMB_WORKER_NUMS 400 #define MAX_RMB_PROXY_NUMS 10 //for config center #define WEMQ_ACCESS_SERVER "dynamicKey/v1/wemqAccessServer.json" #define DYNAMIC_KEY_NAME "dynamicKey/v1" #define RMB_REQ_SINGLE "querySingle" #define RMB_WHITE_LIST_RCV "whitelist-rcv" #define RMB_WHITE_LIST_SND "whitelist-snd" #define WEMQ_PROXY_SERVERS "wemq.proxy.servers" //#define RMB_WHITE_LIST_RCV_MAX 200 //#define RMB_WHITE_LIST_SND_MAX 200 #define RMB_TOPIC_PER_SYS_MAX 10000 #define MAX_LISTEN_TOPIC_NUM 1000 #define RMB_MAX_NORMAL_SESSIONS_IN_A_CONTEXT_FOR_WEMQ 100 //the max normal session in context only for wemq proxy mode #define TCP_BUF_SIZE (3<<20) enum RmbGslSubcmd { GSL_SUBCMD_QUERY_SERVICE = 0x1, GSL_SUBCMD_NEW_QUERY_SERVICE = 0x11, GSL_SUBCMD_QUERY_TOPICGROUP = 0x02, }; enum RmbMsgSource { RMB_MSG_FROM_SOLACE = 1, RMB_MSG_FROM_WEMQ = 2, }; enum RmbTargetOrgIdCommitType { RMB_COMMIT_BY_OWN = 0, RMB_COMMIT_BY_API = 1, }; enum RmbFlagForMsgInit { RMBMSG_DEST_HAS_SET = 0, }; enum EVENT_OR_SERVICE_CALL { RMB_EVENT_CALL = 0, RMB_SERVICE_CALL = 1, }; enum RMB_SVR_MODE { RMB_OLD_MODE = 0, RMB_PROXY_WORKER = 1, }; enum RMB_PROXY_MQ_TYPE { RMB_PROXY_MQ_RECEIVE = 1, RMB_PROXY_MQ_SEND = 2, RMB_PROXY_MQ_RR = 3, }; enum RMB_LOG_POINT { RMB_LOG_START_CALL = 0, RMB_LOG_ENTRY = 1, RMB_LOG_EXIT = 2, RMB_LOG_END_CALL = 3, RMB_LOG_ON_ERROR = 4, RMB_LOG_OTHER = 5, }; //proxy-worker模式下使用 typedef struct StMqFifoArg { char *_fiFoPath; unsigned int _shmKey; unsigned int _shmSize; } StMqFifoArg; enum RMB_MSG_MODE { RMB_MSG_SOLACE = 0, RMB_MSG_WEMQ = 1, }; enum RMB_RSP_CODE { RMB_CODE_TIME_OUT = -1, RMB_CODE_SUSS, RMB_CODE_OTHER_FAIL, RMB_CODE_AUT_FAIL, RMB_CODE_DYED_MSG, }; //**************************for wemq define******************************** #define MAX_WEMQ_KFIFO_LENGTH (1UL << 16) #define GETMSGID(buf,msgId) ({\ char *p = (buf);\ p += 2 * sizeof(int);\ p += 3 * sizeof(char);\ p += sizeof(StSystemHeader);\ p += 2 * sizeof(StDestination);\ (msgId) = *((unsigned long*)(p));\ }) #define IS_DYED_MSG "IS_DYED_MSG" #define MSG_HEAD_COMMAND_STR "command" #define MSG_HEAD_SEQ_INT "seq" #define MSG_HEAD_CODE_INT "code" #define MSG_HEAD_MSG_STR "msg" #define MSG_HEAD_TYPE_INT "type" #define MSG_HEAD_STATUS_INT "status" #define MSG_HEAD_MSG_STR "msg" #define MSG_HEAD_TIME_LINT "time" #define MSG_HEAD_TIMESTAMP_LINT "timestamp" #define MSG_HEAD_DEST_JSON "dest" #define MSG_HEAD_DEST_SCENARIO_STR "scenario" #define MSG_HEAD_DEST_SERVICE_STR "service" #define MSG_HEAD_DEST_DCN_STR "dcn" #define MSG_HEAD_DEST_ORGANIZATION_STR "organization" #define MSG_HEAD_REDIRECT_OBJ "redirect" #define MSG_HEAD_SUB_BYPASS_TOPIC "topic" #define MSG_HEAD_IDC "idc" #define MSG_HEAD_IP "ip" #define MSG_BODY_TOPIC_LIST_JSON "topicList" #define MSG_BODY_TOPIC_STR "topic" #define MSG_BODY_PROPERTY_JSON "properties" #define MSG_BODY_PROPERTY_MSG_TYPE_STR "msgType" #define MSG_BODY_PROPERTY_TTL_INT "TTL" #define MSG_BODY_PROPERTY_SEQ_STR "SEQ" #define MSG_BODY_PROPERTY_RR_REQUEST_UNIQ_ID_STR "RR_REQUEST_UNIQ_ID" #define MSG_BODY_PROPERTY_KEYS_STR "keys" #define MSG_BODY_PROPERTY_REPLYTO_STR "REPLY_TO" #define MSG_BODY_PROPERTY_BORN_TIME_STR "BORN_TIME" #define MSG_BODY_PROPERTY_STORE_TIME_STR "STORE_TIME" #define MSG_BODY_PROPERTY_LEAVE_TIME_STR "LEAVE_TIME" #define MSG_BODY_PROPERTY_ARRIVE_TIME_STR "ARRIVE_TIME" #define MSG_BODY_BYTE_BODY_JSON "body" #define MSG_BODY_BYTE_BODY_APPHEADER_CONTENT_JSON "appHeaderContent" #define MSG_BODY_BYTE_BODY_APPHEADER_NAME_STR "appHeaderName" #define MSG_BODY_BYTE_BODY_CONTENT_STR "body" #define MSG_BODY_BYTE_BODY_CREATETIME_LINT "createTime" #define MSG_BODY_BYTE_BODY_SYSTEM_HEADER_CONTENT_JSON "sysHeaderContent" #define MSG_BODY_RMB_TRACE_LOG_JSON "rmbTraceLog" #define MSG_BODY_RMB_TRACE_LOG_LOG_POINT_STR "logPoint" #define MSG_BODY_RMB_TRACE_LOG_ERR_CODE_STR "errCode" #define MSG_BODY_RMB_TRACE_LOG_ERR_MSG_STR "errMsg" #define MSG_BODY_RMB_TRACE_LOG_MESSAGE_STR "message" #define MSG_BODY_RMB_TRACE_LOG_EXTFIELDS_STR "extFields" #define MSG_BODY_SYSTEM_JSON "sysHeader" #define MSG_BODY_SYSTEM_BIZ_STR "bizSeqNo" #define MSG_BODY_SYSTEM_SEQNO_STR "consumerSeqNo" #define MSG_BODY_SYSTEM_SVRID_STR "consumerSvrId" #define MSG_BODY_SYSTEM_ORGSYS_STR "orgSysId" #define MSG_BODY_SYSTEM_CSMID_STR "consumerId" #define MSG_BODY_SYSTEM_TIME_LINT "tranTimestamp" #define MSG_BODY_SYSTEM_CSMDCN_STR "consumerDCN" #define MSG_BODY_SYSTEM_ORGSVR_STR "orgSvrId" #define MSG_BODY_SYSTEM_ORGID_STR "organizationId" #define MSG_BODY_SYSTEM_VER_STR "version" #define MSG_BODY_SYSTEM_UNIID_STR "uniqueId" #define MSG_BODY_SYSTEM_CONLEN_INT "contentLength" #define MSG_BODY_SYSTEM_MSGTYPE_INT "messageType" #define MSG_BODY_SYSTEM_RRTYPE_INT "rrType" #define MSG_BODY_SYSTEM_ACK_SEQ "ack_seq" #define MSG_BODY_SYSTEM_RECVTYPE_INT "receiveMode" #define MSG_BODY_SYSTEM_SENDTIME_LINT "sendTimestamp" #define MSG_BODY_SYSTEM_RECVTIME_LINT "receiveTimestamp" #define MSG_BODY_SYSTEM_REPLYTIME_LINT "replyTimestamp" #define MSG_BODY_SYSTEM_REPLYRECEIVETIME_LINT "replyReceiveTimestamp" #define MSG_BODY_SYSTEM_APITYPE_INT "apiType" #define MSG_BODY_SYSTEM_LOGICTYPE_INT "logicType" #define MSG_BODY_SYSTEM_SOCOID_STR "solCorrelationId" #define MSG_BODY_SYSTEM_EXTFIELDS_STR "extFields" #define MSG_BODY_SYSTEM_API_VERSION "rmbVersion" #define MSG_BODY_SYSTEM_REQ_IP "req_ip" #define MSG_BODY_SYSTEM_REQ_SYS "req_sys" #define MSG_BODY_SYSTEM_REQ_DCN "req_dcn" #define MSG_BODY_SYSTEM_REQ_IDC "req_idc" #define MSG_BODY_SYSTEM_RSP_IP "rsp_ip" #define MSG_BODY_SYSTEM_RSP_SYS "rsp_sys" #define MSG_BODY_SYSTEM_RSP_DCN "rsp_dcn" #define MSG_BODY_SYSTEM_RSP_IDC "rsp_idc" #define MSG_BODY_APP_JSON "appHeader" #define MSG_BODY_DEST_JSON "destinationContent" #define MSG_BODY_DEST_NAME_STR "name" #define MSG_BODY_DEST_TYPE_STR "type" #define MSG_BODY_DEST_SORE_STR "serviceOrEventId" #define MSG_BODY_DEST_SCENARIO_STR "scenario" #define MSG_BODY_DEST_ANY_DCN_STR "anyDCN" #define MSG_BODY_DEST_DCN_STR "dcnNo" #define MSG_BODY_DEST_ORGID_STR "organizationId" #define MSG_BODY_DEST_ORGFLAG_INT "organizationIdInputFlag" #define MSG_BODY_TTL_LINT "timeToLive" #define MSG_BODY_CONTENT_STR "content" #define MSG_BODY_REPLYTO_STR "replyTo" #define MSG_BODY_CREATETIME_LINT "createTime" #define MSG_BODY_DELIVERYTIME_INT "deliveryTimes" #define MSG_BODY_RESENT_BOOL "resent" #define MSG_BODY_COID_STR "correlationId" #define MSG_BODY_DUP_BOOL "duplicated" #define MSG_BODY_SYN_BOOL "syn" #define LOG_ERROR_POINT "ON_ERROR" typedef struct StRmbMsg StRmbMsg; typedef struct StWemqThreadMsg { unsigned int m_iCmd; unsigned int m_iHeaderLen; unsigned int m_iBodyLen; char *m_pHeader; char *m_pBody; } StWemqThreadMsg; #define RMB_MAX_UNIQUE_NUMS 2048 typedef struct StUniqueIdList { char unique_id[50]; char biz_seq[50]; unsigned int timeout; unsigned int flag; unsigned long timeStamp; } StUniqueIdList; typedef struct StUniqueIdList DataType; typedef struct array { DataType *Data; int size, max_size; void (*Constructor) (struct array *); //构造函数 void (*Input) (DataType, struct array *); //输入数据 int (*get_array_size) (struct array *); //获取arr的大小 int (*return_index_value) (struct array *, int); void (*Destructor) (struct array *); //析构函数 } Array; #define RMB_MAX_ERR_MSG_FROM_ACCESS 1024 //wemq msg 最多4m #define WEMQ_MSG_MSX_LENGTH (1 << 22) typedef struct stContextProxy { pthread_t mainThreadId; pthread_t coThreadId; char *mPubRRBuf; //for rr pthread_mutex_t rrMutex; pthread_cond_t rrCond; int iFlagForRR; //for event msg, wait for ack pthread_mutex_t eventMutex; pthread_cond_t eventCond; int iFlagForEvent; long iSeqForEvent; //for add listen pthread_mutex_t regMutex; pthread_cond_t regCond; int iFlagForReg; //for add listen result check int iResultForReg; //for pub session connect pthread_mutex_t pubMutex; pthread_cond_t pubCond; int iFlagForPub; int iFlagForPublish; //for sub session connect pthread_mutex_t subMutex; pthread_cond_t subCond; int iFlagForSub; // myhash_t* rrHashTable; void *pubContext; void *subContext; StRmbMsg *pReplyMsg; //StUniqueIdList *pUniqueListForRRAsyncNew; //StUniqueIdList *pUniqueListForRRAsyncOld; Array pUniqueListForRRAsyncNew; Array pUniqueListForRRAsyncOld; StUniqueIdList stUnique; //StUniqueIdList stUniqueListForRRAsync[RMB_MAX_UNIQUE_NUMS]; //StUniqueIdList stUniqueListForRRAsyncOld[RMB_MAX_UNIQUE_NUMS]; //Array stUniqueListForRRAsync; //Array stUniqueListForRRAsyncOld; int iFlagForRun; unsigned long ulGoodByeTime; unsigned long ulLastClearRRAysncMsgTime; unsigned long ulLastPrintOldListIsEmpty; int iFlagForRRAsync; //for goodbye pthread_mutex_t goodByeMutex; pthread_cond_t goodByeCond; int iFlagForGoodBye; StWemqTopicList stTopicList; STRUCT_WEMQ_KFIFO (StWemqThreadMsg, MAX_WEMQ_KFIFO_LENGTH) pubFifo; STRUCT_WEMQ_KFIFO (StWemqThreadMsg, MAX_WEMQ_KFIFO_LENGTH) subFifo; } stContextProxy; typedef struct StThreadArgs { stContextProxy *pStContextProxy; int contextType; } StThreadArgs; #define WEMQ_FIFO_SIZE (2 << 20) typedef struct WemqThreadCtx { STRUCT_WEMQ_KFIFO (StWemqThreadMsg, WEMQ_FIFO_SIZE) * m_ptFifo; stContextProxy *m_ptProxyContext; //int m_iThreadId; char *m_pRecvBuff; char *m_pSendBuff; //cache msg which from user thread; StWemqThreadMsg m_stWemqThreadMsg; StWemqThreadMsg m_stHeartBeat; StWemqThreadMsg m_stHelloWord; StWemqThreadMsg m_stListen; int m_iWemqThreadMsgHandled; // StWemqHeader m_stWemqHeader; StWeMQMSG m_stWeMQMSG; StWemqTopicList *m_ptTopicList; //for epoll int m_iEpollFd; struct epoll_event m_stEv; struct epoll_event *m_ptEvents; int m_iFlagForSeverBreak; SSL_CTX *sslCtx; int m_iSockFd; SSL *ssl; int m_iSockFdNew; SSL *sslNew; int m_iSockFdOld; SSL *sslOld; int m_iLastState; int m_iState; int m_contextType; int m_iHeartBeatCount; unsigned int m_uiHeartBeatCurrent; struct timeval stTimeNow; struct timeval stTimeLast; struct timeval stTimeLastRecv; // proxy server 地址 char m_cProxyIP[100]; char m_cProxyIPOld[100]; unsigned int m_uiProxyPort; unsigned int m_uiProxyPortOld; int m_iLocalPort; // socket local port pthread_t m_threadID; // current thread's ID // bool m_lRedirect; int m_lRedirect; char m_cRedirectIP[100]; int m_iRedirectPort; } WemqThreadCtx; //************************************************************************* //**************************rmb msg define********************************* enum RmbDestinationType { RMB_DEST_TOPIC = 0, RMB_DEST_QUEUE, }; #define RMB_SYSTEMHEADER_EXTFIELDS_MAX_LEN 1024 * 2 // 2K #define RMB_SYSTEMHEADER_PROPERTY_MAX_LEN 1024 * 2 // 2K typedef struct StSystemHeader { char cBizSeqNo[50]; //全局唯一业务流水号 char cConsumerSeqNo[50]; //服务消费者系统调用流水号 char cOrgSysId[10]; //交易原始发起方系统编号 char cConsumerSysId[10]; //服务消费者系统编号 char cConsumerSysVersion[10]; //服务消费者的系统版本号 char cConsumerSvrId[50]; //服务消费者服务器标示(服务器名或IP地址) char cRmbVersion[10]; //rmb版本号 char cOrgSvrId[50]; char cUniqueId[50]; //unique id char cConsumerDcn[10]; //服务消费者所在DCN unsigned long ulTranTimeStamp; //交易发起时间戳 char cAppHeaderClass[50]; //类名 char cOrgId[10]; //法人号 int flag; unsigned long ulSendTime; //发送时间 unsigned long ulReceiveTime; //接收时间 unsigned long ulReplyTime; //回包时间 unsigned long ulReplyReceiveTime; //回包接收时间 unsigned long ulMessageDate; int iReceiveMode; int iContentLength; char cExtFields[RMB_SYSTEMHEADER_EXTFIELDS_MAX_LEN]; char cProperty[RMB_SYSTEMHEADER_PROPERTY_MAX_LEN]; int iSetSysVersionFlag; } StSystemHeader; #define REQ_BORN_TIMESTAMP "req_born_timestamp" #define REQ_STORE_TIMESTAMP "req_store_timestamp" #define REQ_LEAVE_TIMESTAMP "req_leave_timestamp" #define REQ_ARRIVE_TIMESTAMP "req_arrive_timestamp" #define RSP_BORN_TIMESTAMP "rsp_born_timestamp" #define RSP_STORE_TIMESTAMP "rsp_store_timestamp" #define RSP_LEAVE_TIMESTAMP "rsp_leave_timestamp" #define RSP_ARRIVE_TIMESTAMP "rsp_arrive_timestamp" typedef struct StAppHeader { char cTransCode[8]; // char cSourceChannelType[32]; // char cWordStationId[4]; // } StAppHeader; typedef struct StDestination { int iDestType; //target type: 0: topic 1: queue char cDestName[200]; } StDestination; typedef struct StFlow StFlow; //包类型分类 enum C_RMB_PKG_TYPE { ALL_TYPE_RMB = 0, //所有类型 QUEUE_PKG = 1, //queue上的消息,一般为请求 RR_TOPIC_PKG = 2, //RR的回包 BROADCAST_TOPIC_PKG = 3, //广播包 MANAGE_TOPIC_PKG = 4, //RMB内部管理topic包 NEW_LOGIC_RECEIVE = 5, NEW_LOGIC_SEND = 6, }; //包来源分类 enum C_RMB_LOGIC_TYPE { REQ_PKG_IN = 1, RSP_PKG_IN = 2, EVENT_PKG_IN = 3, REQ_PKG_OUT = 4, RSP_PKG_OUT = 5, EVENT_PKG_OUT = 6, REQ_PKG_IN_WEMQ = 7, RSP_PKG_IN_WEMQ = 8, EVENT_PKG_IN_WEMQ = 9, REQ_PKG_OUT_WEMQ = 10, RSP_PKG_OUT_WEMQ = 11, EVENT_PKG_OUT_WEMQ = 12, MANAGE_PKG_IN_WEMQ = 13, }; enum C_RMB_MESSAGE_TYPE { RMB_REQ_MSG = 1, RMB_RSP_MSG = 2, RMB_EVENT_MSG = 3, }; //rmb转发包的方式 enum UDP_OR_MQ { MSG_IPC_UDP = 0, MSG_IPC_MQ = 1, }; enum RMB_API_YPE { JAVA_TYPE = 1, C_TYPE = 2, JAVA_TYPE_WEMQ = 3, C_TYPE_WEMQ = 4, }; enum RMB_CONTEXT_TYPE { RMB_CONTEXT_TYPE_SUB = 0, RMB_CONTEXT_TYPE_PUB = 1 }; //typedef struct StRmbMsg struct StRmbMsg { //sessionIndex for wemq unsigned int uiSessIndex; unsigned int uiFlowIndex; //pkg type char cPkgType; //收到包的类型 char cLogicType; //消息来源 char cApiType; //apiType, enum RMB_API_YPE StSystemHeader sysHeader; StDestination dest; StDestination replyTo; unsigned long ulMsgId; unsigned long ulMsgLiveTime; char isDyedMsg[10]; //for receive msg //char cServiceId[8]; //char cScenario[2]; //char cAppHeader[MAX_APPHEADER_LENGTH]; char *cAppHeader; int iMallocAppHeaderLength; int iAppHeaderLen; //char cContent[MAX_MSG_CONTENT_SIZE]; char *cContent; int iMallocContentLength; int iContentLen; char cCorrId[MAX_COMMON_SIZE]; int iCorrLen; //msg src int iEventOrService; //0 event;1 service char strTargetDcn[10]; char strServiceId[10]; char strScenarioId[5]; //for gsl char strTargetOrgId[10]; int iFLagForOrgId; //for flag check int flag; int iMsgMode; //pub:send msg to wemq or solace sub:recv msg from wemq or solace char strLogBuf[1024]; //}StRmbMsg; }; //************for mq typedef void (*rmb_callback_func) (const char *, const int, void *); typedef int (*rmb_callback_func_v2) (const char *, const int, void *); #define MAX_MQ_NUMS 10 #define MAX_FIFO_PATH_NAME_LEN 200 #define MAX_MQ_PKG_SIZE 10000000 //static const unsigned int C_RMB_MQ_HEAD_SIZE = 2 * sizeof(unsigned int); #define C_RMB_MQ_HEAD_SIZE 2 * sizeof(unsigned int) #define C_RMB_MQ_PKG_HEAD_SIZE 2 * sizeof(unsigned int) //static const unsigned int C_RMB_MQ_PKG_HEAD_SIZE = 2 * sizeof(unsigned int); typedef struct StRmbMq { unsigned int uiShmkey; unsigned long ulShmId; unsigned int uiShmSize; //head + tail + real data char *pMqData; unsigned int *pHead; unsigned int *pTail; //real data char *pBlock; unsigned int uiBlockSize; } StRmbMq; typedef struct StRmbFifo { int iFd; char strPath[MAX_FIFO_PATH_NAME_LEN]; } StRmbFifo; typedef struct StRmbQueue { unsigned int uiSize; //head + tail + real data char *pData; unsigned int *pHead; unsigned int *pTail; //real data char *pBlock; unsigned int uiBlockSize; } StRmbQueue; typedef struct StRmbPipe { int fd[2]; int r_fd; int w_fd; } StRmbPipe; typedef struct StMqInfo { StRmbMq *mq; StRmbFifo *fifo; //for wemq StRmbQueue *que; StRmbPipe *pipe; int iIndex; //offset in vector int iMsgType; //iPkgType,1 queue msg;2 rr topic msg;3 broadcast msg;4 manage msg rmb_callback_func func; rmb_callback_func_v2 funcForNew; void *args; //for epoll int active; //for notify num int iCount; unsigned int uiLastCheckTime; int iMergeNotifyFLag; //0:not marge 1:marge int iNotifyFactor; //the factor of notify pthread_mutex_t queMutex; } StMqInfo; enum RmbMqIndex { req_mq_index = 1, rr_rsp_mq_index = 2, broadcast_mq_index = 3, manage_mq_index = 4, // wemq_req_mq_index = 5, // wemq_rr_rsp_mq_index = 6, // wemq_broadcast_mq_index = 7, // wemq_manage_mq_index = 8, }; //for mq notify typedef struct StRmbMqFifoNotify { StMqInfo vecMqInfo[MAX_MQ_NUMS]; int iMqNum; StMqInfo *mqIndex[MAX_MQ_NUMS]; //see define of RmbMqIndex //for select fd_set readFd; fd_set tmpReadFd; struct timeval tv; int iMaxFd; //for epoll int iEpollFd; char *pBuf; unsigned int uiBufLen; } StRmbMqFifoNotify; /////////////////////////////// typedef struct StContext StContext; //*************************context************************ //context的基本定义 struct StContext { // for solace unsigned int uiInitFlag; //for receive msgs StRmbMsg *pReceiveMsg; StRmbMsg *pReceiveMsgForRR; StRmbMsg *pReceiveMsgForBroadCast; StRmbMsg *pReceiveWemqMsg; StRmbMsg *pReceiveWemqMsgForRR; StRmbMsg *pReceiveWemqMsgForBroadCast; //for receive msgs char *pPkg; unsigned int uiPkgLen; //for wemq char *pWemqPkg; unsigned int uiWemqPkgLen; char *pWemqPkgForRRAsync; unsigned int uiWemqPkgForRRAsyncLen; //********UDP************* //for rev req or broadcast int iSocketForReq; struct sockaddr_in tmpReqAddr; //for rev reply int iSocketForRsp; struct sockaddr_in tmpReplyAddr; //for rev broadcast int iSocketForBroadcast; struct sockaddr_in tmpBroadcastAddr; //*******MQ*************** StRmbMqFifoNotify fifoMq; unsigned int uiNowTime; //sub or pub void *pFather; //****for wemq unsigned int uiInitWemqFlag; //for wemq proxy stContextProxy *pContextProxy; //sub or pub; int contextType; }; //********broadcast********* typedef struct StBroadcastNode { char strServiceId[10]; char strScenarioId[5]; char cConsumerSysId[10]; char cReserve[2]; } StBroadcastNode; //********queue节点*********** typedef struct StQueueNode { char cQueueName[30]; int iQueueSize; int iQueueUnackSize; } StQueueNode; typedef struct StQueueName { char cQueueName[30]; } StQueueName; typedef struct StServiceStatus { char strTargetOrgId[10]; char strServiceId[10]; char strScenarioId[5]; char cFlagForOrgId; char cResult; char cRouteFlag; char strTargetDcn[10]; unsigned long ulGetTimes; unsigned long ulInvalidTime; } StServiceStatus; typedef struct StRmbConfig { char strConfigFile[500]; char cConsumerSysId[10]; char cConsumerSysVersion[10]; char cConsumerSvrId[100]; char cOrgSvrId[50]; //char cUniqueId[100]; char cConsumerDcn[10]; //主机名 char cHostName[100]; char cHostIp[50]; unsigned int uiPid; //log int iLogLevel; //RmbLogFile stLog; char logFileName[200]; int iLogFileNums; int iLogFileSize; int iLogShiftType; StRmbLog gRmbLog; //for solace api init unsigned int uiIsInitSolaceApi; //solace int iSwitchForSolaceLog; char strSolaceLog[100]; int iSolaceLogLevel; //for connect success timeout int createConnectionTimeOut; //debug switch int iDebugSwitch; //queue config StQueueNode queueNodeList[5000]; int iQueueListSize; StBroadcastNode broadNodeList[5000]; int iBroadListSize; //for browser pthread_mutex_t configMutex; pthread_cond_t configCond; int iFlag; //for mergeq for gsl pthread_mutex_t mergeqForGslMutex; pthread_cond_t mergeqForGslCond; int iFlagForMerge; int iMergeQueue; //for merge queue pthread_mutex_t mergeQueueMutex; //for log pthread_mutex_t configLog; //for send white list // pthread_mutex_t sendWhiteListMutex; //for debug print char *pLogBuf; //for manage session int iManageFlag; //0 表示还没有, 1表示有 StQueueName fullQueueList[3000]; //queue int iFullNums; //for flag int iFlagForReq; //0:UDP,1:MQ int iFlagForRRrsp; //0:UDP,1:MQ int iFlagForBroadCast; //0:UDP,1:MQ int iFlagForManage; //0:UDP,1:MQ //每次处理的个数 int iEveryTimeProcessNum; //提醒的try间隔 int iNotifyCheckSpan; //for req mq char strFifoPathForReq[128]; unsigned int uiShmKeyForReq; unsigned int uiShmSizeForReq; //for rsp char strFifoPathForRRrsp[128]; unsigned int uiShmKeyForRRrsp; unsigned int uiShmSizeForRRrsp; //for broadcast char strFifoPathForBroadcast[128]; unsigned int uiShmKeyForBroadcast; unsigned int uiShmSizeForBroadcast; //flag //是否合并通知的开关 int iFLagForMergeNotify; //orgId char strOrgId[10]; //last error char _lastError[300]; //是否允许发送的开关 int iFlagForPublish; unsigned long ulLastStopTime; unsigned long ulLastAllowTime; char cLastMsg[50]; //rmb的启动时间 unsigned long ulStartTime; //rmb init time //RR info unsigned int uiSendRRMsgNums; unsigned int uiSendRRMsgError; //Event info unsigned int uiSendEventMsg; unsigned int uiSendEventMsgError; //AyncRR unsigned int uiSendAyncRRMsg; unsigned int uiSendAyncRRMsgError; //receiveMsg unsigned int uiReceiveServiceReqMsg; unsigned int uiReceiveAyncRRReply; unsigned int uiReceiveEventMsg; StServiceStatus serviceStatusList[MAX_SERVICE_STATUS_CACHE_NUMS]; int iCacheServiceNums; int iCacheTimeoutTime; int iCacheSuccTimeoutTime; //gsl query success, timeout default is 600s int iCacheFailedTimeoutTime; //gsl query failed, timeout default is 30s //timtout unsigned long ulNowTtime; //exit timtout unsigned long ulExitTimeOut; //for rmb proxy worker int iRmbMode; int iWorkerSendBufSize; //for reload cfg //signal1 int iSwitchForSignal1; //signal2 int iSwitchForSignal2; //for ack int ackTimers; int ackThresHold; //for query timeout int iQueryTimeout; //for sub broadcast LVQ unsigned int uiIsBroadcastLVQ; //for msg trans by solace timeout :daxin int iCommonTimeOut; //for rmb period stat log int iStatPeriod; //get group topic status time int iGetGroupTopicTime; //get send white list time int iGetSendWhiteListTime; //for clean merge q thread pthread_t pid_merge; int flag_merge; // logServer log switch for user int iLogserverSwitch; //logServer log switch for api int iApiLogserverSwitch; //rmb mode: wemq char cRmbMode[10]; int iConnWemq; //gsl control int iReqGsl; //for config center char cConfigIp[256]; int iConfigPort; int iConfigTimeout; int configIpPosInArr; char ConfigAddr[512]; // char cWemqSavePath[1024]; int tlsOnoff; //local IDC config char cRegion[10]; int iFlagForLoop; //for proxyContext int iProxyContextNums; void *pProxyContext; //for mode worker heart beat; int heartBeatPeriod; int heartBeatTimeout; //for get access ip int getAccessIpPeriod; //for access ack int accessAckTimeOut; //for rr async int rrAsyncTimeOut; //for access goodbye int goodByeTimeOut; // wemq-access 配置中心服务器 int iWemqUseHttpCfg; // 是否启用Http配置中心 //for default tcp char cWemqProxyIp[100]; unsigned int cWemqProxyPort; int iWemqTcpConnectRetryNum; // 连接wemq-access重试次数 int iWemqTcpConnectDelayTime; // 每次重连间隔时间(ms) int iWemqTcpConnectTimeout; // 每次TCP连接超时时间 int iWemqTcpSocketTimeout; // TCP socket 超时时间 //for topic int iNormalTimeout; // 监听topic超时时间(ms), default is:120000 //for wemq user/passwd char cWemqUser[100]; char cWemqPasswd[100]; int mqIsEmpty; char strDepartMent[20]; } StRmbConfig; typedef struct RmbPythonConfig { //for req mq char strFifoPathForReq[128]; unsigned int uiShmKeyForReq; unsigned int uiShmSizeForReq; //for rsp char strFifoPathForRRrsp[128]; unsigned int uiShmKeyForRRrsp; unsigned int uiShmSizeForRRrsp; //for broadcast char strFifoPathForBroadcast[128]; unsigned int uiShmKeyForBroadcast; unsigned int uiShmSizeForBroadcast; } RmbPythonConfig; enum MQ_STATUS { MQ_INIT = 0, MQ_IS_NOT_EMPTY, MQ_IS_EMPTY }; //灰度完成queue的场景ID #define GRAY_COMPLETE_SCENE "FR" enum TOPIC_STATUS { QUEUE_NOT_GRAY = 0, QUEUE_GRAY_INIT, QUEUE_GRAY_NOT_COMPLETE, QUEUE_GRAY_COMPLETE }; typedef struct StRmbListenQueueInfo { char cDcn[5]; char cServiceId[10]; char cScenarioId[5]; } st_rmb_queue_info; //rmb topic info typedef struct StRmbTopicInfo { char cServiceId[10]; char cScenarioId[5]; char cTopicGroup[32]; char cTopicStatus[3]; int flag; } StRmbTopicInfo; extern StRmbConfig *pRmbStConfig; extern char cManageTopic[30]; extern char cQueueFullTopic[30]; extern char cLogLevelTopic[30]; ///////////////add by wan extern char cPublishCheck[30]; ///////////////////////////// extern StContext *g_pStContextArry[MAX_RMB_CONTEXT]; #define LOGRMB(loglevel,fmt, args...) {LogRmb(loglevel, "[%s:%d(%s)]["fmt"]", __FILE__, __LINE__, __FUNCTION__, ## args);\ if(loglevel==RMB_LOG_ERROR) snprintf(pRmbStConfig->_lastError, sizeof(pRmbStConfig->_lastError)-1, fmt, ## args);} //****common tool******* #define RMB_MGS_PRINT_P(p) p->cConsumerSysId,p->cConsumerSysId,p->cConsumerSysId, #define RMB_MAX(a,b) (a>b)?a:b #define RMB_MIN(a,b) (a<b)?a:b //#define RMB_MEMCPY(a,b); memcpy(a, b, RMB_MIN( sizeof(a),strlen(b)+1 ) ); #define RMB_MEMCPY(a,b); strncpy(a, b, sizeof(a)); //#define RMB_LEN_CHECK(a,len); if(strlen(a) > len){return -1;} #define RMB_LEN_CHECK(a,len); if(strlen(a) != len){rmb_errno=RMB_ERROR_ARGV_LEN_ERROR;return rmb_errno;} //#define RMB_CHECK_POINT_NULL(a,b); if((void*)a == NULL) {LOGRMB(RMB_LOG_ERROR,"%s is NULL!", b);return -1;} #define RMB_CHECK_POINT_NULL(a,b); if((void*)a == NULL) {LOGRMB(RMB_LOG_ERROR,"%s is NULL!", b);rmb_errno=RMB_ERROR_ARGV_NULL;return RMB_ERROR_ARGV_NULL;} //end #define RMB_MEMSET(a); memset(&a, 0, sizeof(a)); #define RMB_ADD_SESSION_PROPERTY(a,b,c); sprintf(a->cSessionProps[a->uiPropIndex], "%s", b);\ a->vecPSessProps[a->uiPropIndex] = a->cSessionProps[a->uiPropIndex];\ a->uiPropIndex++;\ sprintf(a->cSessionProps[a->uiPropIndex], "%s", c);\ a->vecPSessProps[a->uiPropIndex] = a->cSessionProps[a->uiPropIndex];\ a->uiPropIndex++; #define RMB_ADD_FLOW_PROPERTY(a,b,c); sprintf(a->cFlowProps[a->uiPropIndex], "%s", b);\ a->vecPFlowProps[a->uiPropIndex] = a->cFlowProps[a->uiPropIndex];\ a->uiPropIndex++;\ sprintf(a->cFlowProps[a->uiPropIndex], "%s", c);\ a->vecPFlowProps[a->uiPropIndex] = a->cFlowProps[a->uiPropIndex];\ a->uiPropIndex++; #define RMB_ADD_FLOW_PROPERTY_INT(a,b,c); sprintf(a->cFlowProps[a->uiPropIndex], "%s", b);\ a->vecPFlowProps[a->uiPropIndex] = a->cFlowProps[a->uiPropIndex]; \ a->uiPropIndex++; \ sprintf(a->cFlowProps[a->uiPropIndex], "%d", c);\ a->vecPFlowProps[a->uiPropIndex] = a->cFlowProps[a->uiPropIndex];\ a->uiPropIndex++; #ifdef __cplusplus } #endif #endif