in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [1148:2255]
static int32_t _wemq_thread_on_message (WemqThreadCtx * pThreadCtx,
bool isRecvNewConnect)
{
ASSERT (pThreadCtx);
stContextProxy *pContextProxy = pThreadCtx->m_ptProxyContext;
StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
WEMQJSON *jsonHeader = NULL;
WEMQJSON *jsonTmp = NULL;
const char *usCmd;
int serRet = -1;
int bodyLen = 0;
int seq = -1;
long time = 0;
int ret = 0;
char cMsg[RMB_MAX_ERR_MSG_FROM_ACCESS];
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] |access2wemq_thread|Recv header:%s body:%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
bodyLen = pWemqHeader->uiTotalLen - pWemqHeader->uiHeaderLen - 8;
jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
if (jsonHeader == NULL)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] json_tokener_parse header error: %s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pWemqHeader->cStrJsonHeader) return -1;
}
//get command
json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
if (jsonTmp != NULL)
{
usCmd = json_object_get_string (jsonTmp);
}
//get code
json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
if (jsonTmp != NULL)
{
serRet = json_object_get_int (jsonTmp);
}
//get seq
json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
if (jsonTmp != NULL)
{
seq = json_object_get_int (jsonTmp);
}
//get time
json_object_object_get_ex (jsonHeader, MSG_HEAD_TIME_LINT, &jsonTmp);
if (jsonTmp != NULL)
{
time = (long) json_object_get_int64 (jsonTmp);
}
memset (cMsg, 0x00, sizeof (cMsg));
json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
if (jsonTmp != NULL)
{
strncpy (cMsg, json_object_get_string (jsonTmp), sizeof (cMsg) - 1);
}
if (strcmp (cMsg, "auth exception") == 0)
{ // auth failed
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Authentication error!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
return -1;
}
if (serRet != RMB_CODE_SUSS)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Time:%ld] code from access !=0, cmd=%s, seq=%d, code=%d, msg=%s!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, time, usCmd,
seq, serRet, cMsg);
}
if (strcmp (usCmd, HEARTBEAT_RESPONSE) == 0) //心跳
{
gettimeofday (&pThreadCtx->stTimeLastRecv, NULL);
pThreadCtx->m_uiHeartBeatCurrent = 0;
}
else if (strcmp (usCmd, HELLO_RESPONSE) == 0)
{
ret = serRet;
}
else if (strcmp (usCmd, CLIENT_GOODBYE_RESPONSE) == 0)
{
ret = WEMQ_MESSAGE_RET_CLIENTGOODBYE;
}
else if (strcmp (usCmd, SERVER_GOODBYE_REQUEST) == 0)
{
ret = WEMQ_MESSAGE_RET_SERVERGOODBYE;
}
else if (strcmp (usCmd, REDIRECT_TO_CLIENT) == 0)
{
ret =
_wemq_thread_on_message_redirect (pThreadCtx,
pWemqHeader->cStrJsonBody);
}
else if (strcmp (usCmd, ASYNC_MESSAGE_TO_SERVER_ACK) == 0) //ack 单播 from access
{
if (serRet == RMB_CODE_SUSS)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request success!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
LOGRMB (RMB_LOG_DEBUG, "seq for event: %d",
pContextProxy->iSeqForEvent);
pthread_mutex_lock (&pContextProxy->eventMutex);
if (seq == pContextProxy->iSeqForEvent)
{
pContextProxy->iFlagForEvent = serRet;
pthread_cond_signal (&pContextProxy->eventCond);
}
pthread_mutex_unlock (&pContextProxy->eventMutex);
}
else if (serRet == RMB_CODE_AUT_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request Authentication fail!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
pthread_mutex_lock (&pContextProxy->eventMutex);
pContextProxy->iFlagForEvent = serRet;
pthread_cond_signal (&pContextProxy->eventCond);
pthread_mutex_unlock (&pContextProxy->eventMutex);
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request fail!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
pthread_mutex_lock (&pContextProxy->eventMutex);
pContextProxy->iFlagForEvent = serRet;
pthread_cond_signal (&pContextProxy->eventCond);
pthread_mutex_unlock (&pContextProxy->eventMutex);
}
if (pContextProxy->iFlagForEvent == -1)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] send event msg signal succ! iFlagForEvent=%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
serRet);
}
}
else if (strcmp (usCmd, ASYNC_MESSAGE_TO_CLIENT) == 0) //recv event message from proxy
{
if (serRet == 0)
{
StContext *pStContext;
if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
{
pStContext = (StContext *) (pContextProxy->pubContext);
}
else
{
pStContext = (StContext *) (pContextProxy->subContext);
}
if (pStContext == NULL)
{
LOGRMB (RMB_LOG_ERROR, "pStContext is null");
}
int iRet = 0;
if ((iRet =
trans_json_2_rmb_msg (pStContext->pReceiveWemqMsg,
pWemqHeader->cStrJsonBody,
ASYNC_MESSAGE_TO_CLIENT)) != 0)
{
LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
pWemqHeader->cStrJsonBody);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"trans_json_2_rmb_msg failed",
pStContext->pReceiveWemqMsg);
return -1;
}
pStContext->pReceiveWemqMsg->iMsgMode = RMB_MSG_FROM_WEMQ;
pStContext->pReceiveWemqMsg->cPkgType = QUEUE_PKG;
pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;
set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsg,
ASYNC_MESSAGE_TO_CLIENT, seq);
iRet =
shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
pStContext->pReceiveWemqMsg);
//LOGRMB(RMB_LOG_DEBUG, "appHeaderLen:%d", pStContext->pReceiveWemqMsg->iAppHeaderLen);
//LOGRMB(RMB_LOG_DEBUG, "uiWemqPkgLen:%d", pStContext->uiWemqPkgLen);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
iRet, pStContext->pReceiveWemqMsg->iEventOrService,
pStContext->pReceiveWemqMsg->strTargetDcn,
pStContext->pReceiveWemqMsg->strServiceId,
pStContext->pReceiveWemqMsg->strScenarioId,
pStContext->pReceiveWemqMsg->sysHeader.cUniqueId,
pStContext->pReceiveWemqMsg->iMsgMode,
rmb_msg_print (pStContext->pReceiveWemqMsg));
}
//染色消息直接返回
if (check_dyed_msg (pStContext->pReceiveWemqMsg) > 0)
{
_wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
ASYNC_MESSAGE_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsg);
LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
return serRet;
}
int iMqIndex = req_mq_index;
LOGRMB (RMB_LOG_DEBUG, "WEMQ_CMD_ASYNCEVENT_SUB:Ready to enqueue");
if (pRmbStConfig->iFlagForReq == (int) MSG_IPC_MQ)
{
//if (bodyLen >= pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize - C_RMB_MQ_PKG_HEAD_SIZE)
if (pStContext->uiWemqPkgLen >=
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
C_RMB_MQ_PKG_HEAD_SIZE)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pStContext->uiWemqPkgLen,
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"receive reqSize bigger than shmSize",
pStContext->pReceiveWemqMsg);
}
else
{
while ((iRet =
rmb_context_enqueue (pStContext,
(const enum RmbMqIndex) iMqIndex,
pStContext->pWemqPkg,
pStContext->uiWemqPkgLen)) == -2)
{
// LOG
LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] req queue full!wait!",
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
usleep (1000);
}
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"req wemq_context_enqueue error!enqueue failed",
pStContext->pReceiveWemqMsg);
return 0;
}
else
{
LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] Enqueue succ, msg %s",
pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pWemqHeader->cStrJsonBody);
}
}
}
else
{
iRet =
sendto (pStContext->iSocketForReq, pStContext->pWemqPkg,
pStContext->uiWemqPkgLen, 0,
(const struct sockaddr *) &pStContext->tmpReqAddr,
sizeof (pStContext->tmpReqAddr));
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
}
}
}
}
else if (strcmp (usCmd, REQUEST_TO_CLIENT) == 0) //sub端收到RR请求消息
{
if (serRet == 0)
{
StContext *pStContext;
if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
{
pStContext = (StContext *) (pContextProxy->pubContext);
}
else
{
pStContext = (StContext *) (pContextProxy->subContext);
}
if (pStContext == NULL)
{
LOGRMB (RMB_LOG_ERROR, "pStContext is null");
}
int iRet = 0;
if ((iRet =
trans_json_2_rmb_msg (pStContext->pReceiveWemqMsg,
pWemqHeader->cStrJsonBody,
REQUEST_TO_CLIENT)) != 0)
{
LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
pWemqHeader->cStrJsonBody);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"trans_json_2_rmb_msg failed",
pStContext->pReceiveWemqMsg);
return -1;
}
pStContext->pReceiveWemqMsg->iMsgMode = RMB_MSG_FROM_WEMQ;
pStContext->pReceiveWemqMsg->cPkgType = QUEUE_PKG;
pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;
set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsg, REQUEST_TO_CLIENT,
seq);
iRet =
shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
pStContext->pReceiveWemqMsg);
//LOGRMB(RMB_LOG_DEBUG, "uiWemqPkgLen:%d", pStContext->uiWemqPkgLen);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
iRet, pStContext->pReceiveWemqMsg->iEventOrService,
pStContext->pReceiveWemqMsg->strTargetDcn,
pStContext->pReceiveWemqMsg->strServiceId,
pStContext->pReceiveWemqMsg->strScenarioId,
pStContext->pReceiveWemqMsg->sysHeader.cUniqueId,
pStContext->pReceiveWemqMsg->iMsgMode,
rmb_msg_print (pStContext->pReceiveWemqMsg));
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"shift_msg_2_buf failed",
pStContext->pReceiveWemqMsg);
return -1;
}
int iMqIndex = req_mq_index;
//染色消息直接返回
if (check_dyed_msg (pStContext->pReceiveWemqMsg) > 0)
{
LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
_wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
REQUEST_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsg);
_wemq_thread_dyed_msg_reply_to_access (pThreadCtx, seq, 0,
RESPONSE_TO_SERVER,
pStContext->pReceiveWemqMsg);
return serRet;
}
LOGRMB (RMB_LOG_DEBUG, "WEMQ_CMD_SYNCREQ:Ready to en queue");
if (pRmbStConfig->iFlagForReq == (int) MSG_IPC_MQ)
{
//if (bodyLen >= pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize - C_RMB_MQ_PKG_HEAD_SIZE)
if (pStContext->uiWemqPkgLen >=
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
C_RMB_MQ_PKG_HEAD_SIZE)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pStContext->uiWemqPkgLen,
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"receive reqSize bigger than shmSize",
pStContext->pReceiveWemqMsg);
}
else
{
while ((iRet =
rmb_context_enqueue (pStContext,
(const enum RmbMqIndex) iMqIndex,
pStContext->pWemqPkg,
pStContext->uiWemqPkgLen)) == -2)
{
// LOG
LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] req queue full!wait!",
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
usleep (1000);
}
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"req wemq_context_enqueue error!enqueue failed",
pStContext->pReceiveWemqMsg);
}
else
{
LOGRMB (RMB_LOG_DEBUG,
"[Type:%d] [TID:%lu] Enqueue succ, msg: header %s body %s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
}
}
}
else
{
iRet =
sendto (pStContext->iSocketForReq, pStContext->pWemqPkg,
pStContext->uiWemqPkgLen, 0,
(const struct sockaddr *) &pStContext->tmpReqAddr,
sizeof (pStContext->tmpReqAddr));
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
}
}
}
}
else if (strcmp (usCmd, RESPONSE_TO_CLIENT) == 0) //rr请求端收到的回包
{
if (serRet == 0) //rsp succ
{
WEMQJSON *jsonByteBody = NULL;
WEMQJSON *systemHeader = NULL;
WEMQJSON *sysExtFields = NULL;
WEMQJSON *jsonDecoder = NULL;
int rrType = 0;
WEMQJSON *jsonBody = json_tokener_parse (pWemqHeader->cStrJsonBody);
if (jsonBody == NULL)
{
LOGRMB (RMB_LOG_ERROR, "json_tokener_parse failed!,buf is:%s",
pWemqHeader->cStrJsonBody);
return -1;
}
if (!json_object_object_get_ex
(jsonBody, MSG_BODY_BYTE_BODY_JSON, &jsonByteBody))
{
LOGRMB (RMB_LOG_ERROR, "body json no byte body!");
json_object_put (jsonBody);
return -1;
} //byte body json
jsonByteBody =
json_tokener_parse (json_object_get_string (jsonByteBody));
if (NULL == jsonByteBody
|| !json_object_object_get_ex (jsonByteBody,
MSG_BODY_BYTE_BODY_SYSTEM_HEADER_CONTENT_JSON,
&systemHeader))
{
LOGRMB (RMB_LOG_ERROR, "byte body json no system header content!");
json_object_put (jsonBody);
if (NULL != jsonByteBody)
{
json_object_put (jsonByteBody);
}
return -1;
} //system header json
systemHeader =
json_tokener_parse (json_object_get_string (systemHeader));
if (json_object_object_get_ex
(systemHeader, MSG_BODY_SYSTEM_EXTFIELDS_STR, &sysExtFields))
{
sysExtFields =
json_tokener_parse (json_object_get_string (sysExtFields));
if (json_object_object_get_ex
(sysExtFields, MSG_BODY_SYSTEM_RRTYPE_INT, &jsonDecoder))
{
rrType = json_object_get_int (jsonDecoder);
}
} //system extFields RRtype
if (0 == rrType)
{
if (bodyLen >= (TCP_BUF_SIZE * sizeof (char)))
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] get rsp too long,bodyLen=%d,buf_le=%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq,
time, bodyLen, TCP_BUF_SIZE);
}
int i = 0;
int recvFlag = 0;
pthread_mutex_lock (&pContextProxy->rrMutex);
if (pContextProxy->iFlagForRR == -1)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] signal succ! iFlagForRR=0",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq,
time);
memcpy (pContextProxy->mPubRRBuf, pWemqHeader->cStrJsonBody,
bodyLen);
pContextProxy->mPubRRBuf[bodyLen] = '\0';
trans_json_2_rmb_msg (pContextProxy->pReplyMsg,
pContextProxy->mPubRRBuf, RESPONSE_TO_CLIENT);
//LOGRMB(RMB_LOG_DEBUG, "destname :%s", pContextProxy->pReplyMsg->dest.cDestName);
GetRmbNowLongTime ();
pContextProxy->pReplyMsg->sysHeader.ulReplyReceiveTime =
pRmbStConfig->ulNowTtime;
set_extfields_2_rmb_msg (pContextProxy->pReplyMsg,
RESPONSE_TO_CLIENT, seq);
if (pContextProxy->stUnique.flag == 1
&& strcmp (pContextProxy->stUnique.unique_id,
pContextProxy->pReplyMsg->sysHeader.cUniqueId) == 0)
{
if (check_dyed_msg (pContextProxy->pReplyMsg) > 0)
{
LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s",
pContextProxy->mPubRRBuf);
pContextProxy->iFlagForRR = RMB_CODE_DYED_MSG;
}
else
{
pContextProxy->iFlagForRR = serRet;
}
pContextProxy->stUnique.flag = 0;
pthread_cond_signal (&pContextProxy->rrCond);
recvFlag = 1;
}
}
pthread_mutex_unlock (&pContextProxy->rrMutex);
if (recvFlag == 1)
{
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
RESPONSE_TO_CLIENT_ACK,
pContextProxy->pReplyMsg);
//_wemq_thread_send_error_log(pThreadCtx, seq, 0, "ok", pContextProxy->pReplyMsg);
}
}
else //case WEMQ_CMD_ASYNCRSP RR异步消息,服务请求方收到回包
{
StContext *pStContext = (StContext *) (pContextProxy->subContext);
pContextProxy->iFlagForRRAsync = 0;
if (pStContext == NULL)
{
LOGRMB (RMB_LOG_ERROR, "[TID:%lu] pStContext is null",
pThreadCtx->m_threadID);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
return -1;
}
int iRet = 0;
if ((iRet =
trans_json_2_rmb_msg (pStContext->pReceiveWemqMsgForRR,
pWemqHeader->cStrJsonBody,
RESPONSE_TO_CLIENT)) != 0)
{
LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
pWemqHeader->cStrJsonBody);
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
RESPONSE_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsgForRR);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"trans_json_2_rmb_msg failed",
pStContext->pReceiveWemqMsgForRR);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
return -1;
}
GetRmbNowLongTime ();
pStContext->pReceiveWemqMsgForRR->sysHeader.ulReplyReceiveTime =
pRmbStConfig->ulNowTtime;
int i;
set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsgForRR,
RESPONSE_TO_CLIENT, seq);
int recvFlag = 0;
pthread_mutex_lock (&pContextProxy->rrMutex);
//因为在access goodbye的时候,消息可能在旧连接上,所以必须同时扫描新旧连接,不能只扫描新连接
//if(isRecvNewConnect)
//{
for (i = 0;
i <
pContextProxy->pUniqueListForRRAsyncNew.
get_array_size (&pContextProxy->pUniqueListForRRAsyncNew); i++)
{
if (pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag == 1
&& strcmp (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
unique_id,
pStContext->pReceiveWemqMsgForRR->sysHeader.
cUniqueId) == 0)
{
pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag = 0;
recvFlag = 1;
break;
}
}
// }else
// {
for (i = 0;
i <
pContextProxy->pUniqueListForRRAsyncOld.
get_array_size (&pContextProxy->pUniqueListForRRAsyncOld); i++)
{
if (pContextProxy->pUniqueListForRRAsyncOld.Data[i].flag == 1
&& strcmp (pContextProxy->pUniqueListForRRAsyncOld.Data[i].
unique_id,
pStContext->pReceiveWemqMsgForRR->sysHeader.
cUniqueId) == 0)
{
pContextProxy->pUniqueListForRRAsyncOld.Data[i].flag = 0;
recvFlag = 1;
break;
}
}
pthread_mutex_unlock (&pContextProxy->rrMutex);
//}
//已经超时,不回包给pub端
if (recvFlag == 0)
{
LOGRMB (RMB_LOG_WARN,
"rr async response bizseq=%s, uniqueId=%s comes back, but request has timeout",
pStContext->pReceiveWemqMsgForRR->sysHeader.cBizSeqNo,
pStContext->pReceiveWemqMsgForRR->sysHeader.cUniqueId);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
return -1;
}
pStContext->pReceiveWemqMsgForRR->cPkgType = RR_TOPIC_PKG;
pStContext->pReceiveWemqMsgForRR->iMsgMode = RMB_MSG_FROM_WEMQ;
pStContext->uiWemqPkgForRRAsyncLen = MAX_LENTH_IN_A_MSG;
iRet =
shift_msg_2_buf (pStContext->pWemqPkgForRRAsync,
&pStContext->uiWemqPkgForRRAsyncLen,
pStContext->pReceiveWemqMsgForRR);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
iRet, pStContext->pReceiveWemqMsgForRR->iEventOrService,
pStContext->pReceiveWemqMsgForRR->strTargetDcn,
pStContext->pReceiveWemqMsgForRR->strServiceId,
pStContext->pReceiveWemqMsgForRR->strScenarioId,
pStContext->pReceiveWemqMsgForRR->sysHeader.cUniqueId,
pStContext->pReceiveWemqMsgForRR->iMsgMode,
rmb_msg_print (pStContext->pReceiveWemqMsgForRR));
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
RESPONSE_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsgForRR);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"shift_msg_2_buf error",
pStContext->pReceiveWemqMsgForRR);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
return -1;
}
if (check_dyed_msg (pStContext->pReceiveWemqMsgForRR) > 0)
{
LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s",
pWemqHeader->cStrJsonBody);
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
RESPONSE_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsgForRR);
json_object_put (jsonBody);
json_object_put (jsonByteBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
return serRet;
}
int iMqIndex = rr_rsp_mq_index;
LOGRMB (RMB_LOG_DEBUG,
"WEMQ_CMD_ASYNCRSP:Recv rr rsp Ready to en queue");
if (pRmbStConfig->iFlagForRRrsp == (int) MSG_IPC_MQ)
{
if (pStContext->uiWemqPkgForRRAsyncLen >=
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
C_RMB_MQ_PKG_HEAD_SIZE)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pStContext->uiWemqPkgForRRAsyncLen,
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
RESPONSE_TO_CLIENT_ACK,
pStContext->
pReceiveWemqMsgForRR);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"receive msg bigger than shmSize",
pStContext->pReceiveWemqMsgForRR);
}
else
{
while ((iRet =
rmb_context_enqueue (pStContext,
(const enum RmbMqIndex) iMqIndex,
pStContext->pWemqPkgForRRAsync,
pStContext->
uiWemqPkgForRRAsyncLen)) == -2)
{
// LOG
LOGRMB (RMB_LOG_DEBUG,
"[Type:%d] [TID:%lu] req queue full!wait!",
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
usleep (1000);
}
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
RESPONSE_TO_CLIENT_ACK,
pStContext->
pReceiveWemqMsgForRR);
_wemq_thread_send_error_log (pThreadCtx, seq, -1,
LOG_ERROR_POINT,
"req wemq_context_enqueue error",
pStContext->pReceiveWemqMsgForRR);
}
else
{
LOGRMB (RMB_LOG_DEBUG,
"[Type:%d] [TID:%lu] Enqueue succ, msg: header %s body %s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
_wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
RESPONSE_TO_CLIENT_ACK,
pStContext->
pReceiveWemqMsgForRR);
}
}
}
else
{
iRet =
sendto (pStContext->iSocketForRsp, pStContext->pWemqPkgForRRAsync,
pStContext->uiWemqPkgForRRAsyncLen, 0,
(const struct sockaddr *) &pStContext->tmpReplyAddr,
sizeof (pStContext->tmpReplyAddr));
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
}
}
}
json_object_put (jsonByteBody);
json_object_put (jsonBody);
if (NULL != systemHeader)
{
json_object_put (systemHeader);
}
if (NULL != sysExtFields)
{
json_object_put (sysExtFields);
}
}
else if (serRet == RMB_CODE_AUT_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] rr request ret Authentication fail",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
pthread_mutex_lock (&pContextProxy->rrMutex);
pContextProxy->iFlagForRR = serRet;
pContextProxy->stUnique.flag = 0;
pthread_cond_signal (&pContextProxy->rrCond);
pthread_mutex_unlock (&pContextProxy->rrMutex);
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] rr request ret fail",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
pthread_mutex_lock (&pContextProxy->rrMutex);
pContextProxy->iFlagForRR = serRet;
pContextProxy->stUnique.flag = 0;
pthread_cond_signal (&pContextProxy->rrCond);
pthread_mutex_unlock (&pContextProxy->rrMutex);
}
}
else if (strcmp (usCmd, BROADCAST_MESSAGE_TO_SERVER_ACK) == 0) //收到广播消息的ack
{
if (serRet == RMB_CODE_SUSS)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request success",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
}
else if (serRet == RMB_CODE_AUT_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request Authentication fail!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
}
else
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request failed!",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
}
pthread_mutex_lock (&pContextProxy->eventMutex);
if (pContextProxy->iFlagForEvent == -1)
{
LOGRMB (RMB_LOG_DEBUG,
"[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] send Broadcast msg signal succ! iFlagForEvent=%d",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
serRet);
LOGRMB (RMB_LOG_DEBUG, "seq for Broadcast: %d",
pContextProxy->iSeqForEvent);
if (seq == pContextProxy->iSeqForEvent)
{
pContextProxy->iFlagForEvent = serRet;
pthread_cond_signal (&pContextProxy->eventCond);
}
}
pthread_mutex_unlock (&pContextProxy->eventMutex);
}
else if (strcmp (usCmd, BROADCAST_MESSAGE_TO_CLIENT) == 0) //收到广播消息
{
if (serRet == 0)
{
StContext *pStContext = NULL;
if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
{
pStContext = (StContext *) (pContextProxy->pubContext);
}
else
{
pStContext = (StContext *) (pContextProxy->subContext);
}
if (pStContext == NULL)
{
LOGRMB (RMB_LOG_ERROR, "[TID:%lu] pStContext is null",
pThreadCtx->m_threadID);
return -1;
}
int iRet = 0;
if ((iRet =
trans_json_2_rmb_msg (pStContext->pReceiveWemqMsgForBroadCast,
pWemqHeader->cStrJsonBody,
BROADCAST_MESSAGE_TO_CLIENT)) != 0)
{
LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
pWemqHeader->cStrJsonBody);
return -1;
}
pStContext->pReceiveWemqMsgForBroadCast->cPkgType = BROADCAST_TOPIC_PKG;
pStContext->pReceiveWemqMsgForBroadCast->iMsgMode = RMB_MSG_FROM_WEMQ;
pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;
set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsgForBroadCast,
BROADCAST_MESSAGE_TO_CLIENT, seq);
iRet =
shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
pStContext->pReceiveWemqMsgForBroadCast);
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
iRet,
pStContext->pReceiveWemqMsgForBroadCast->iEventOrService,
pStContext->pReceiveWemqMsgForBroadCast->strTargetDcn,
pStContext->pReceiveWemqMsgForBroadCast->strServiceId,
pStContext->pReceiveWemqMsgForBroadCast->strScenarioId,
pStContext->pReceiveWemqMsgForBroadCast->sysHeader.cUniqueId,
pStContext->pReceiveWemqMsgForBroadCast->iMsgMode,
rmb_msg_print (pStContext->pReceiveWemqMsgForBroadCast));
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"shift_msg_2_buf error",
pStContext->pReceiveWemqMsgForBroadCast);
return -1;
}
//染色消息直接返回
if (check_dyed_msg (pStContext->pReceiveWemqMsgForBroadCast) > 0)
{
_wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
BROADCAST_MESSAGE_TO_CLIENT_ACK,
pStContext->pReceiveWemqMsg);
LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
return serRet;
}
int iMqIndex = broadcast_mq_index;
LOGRMB (RMB_LOG_DEBUG,
"WEMQ_CMD_BROADCAST_SUB:Recv broadcast message ready to enqueue");
if (pRmbStConfig->iFlagForBroadCast == (int) MSG_IPC_MQ)
{
if (pStContext->uiWemqPkgLen >=
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
C_RMB_MQ_PKG_HEAD_SIZE)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] receive broadcast message reqSize=%u bigger than shmSize=%u,discard",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pStContext->uiWemqPkgLen,
pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"receive broadcast message reqSize bigger than shmSize",
pStContext->
pReceiveWemqMsgForBroadCast);
}
else
{
while ((iRet =
rmb_context_enqueue (pStContext,
(const enum RmbMqIndex) iMqIndex,
pStContext->pWemqPkg,
pStContext->uiWemqPkgLen)) == -2)
{
//queue full
LOGRMB (RMB_LOG_ERROR, "[Type:%d] [TID:%lu] req queue full!wait!",
pThreadCtx->m_contextType, pThreadCtx->m_threadID);
usleep (1000);
}
if (iRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] req rmb_context_enqueue error!iRet=%d!receive=%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
_wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
"req rmb_context_enqueue error",
pStContext->
pReceiveWemqMsgForBroadCast);
}
else
{
LOGRMB (RMB_LOG_DEBUG,
"[Type:%d] [TID:%lu] enqueue succ, msg: header %s body %s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID,
pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
}
}
}
else
{
iRet =
sendto (pStContext->iSocketForBroadcast, pStContext->pWemqPkg,
pStContext->uiWemqPkgLen, 0,
(const struct sockaddr *) &pStContext->tmpBroadcastAddr,
sizeof (pStContext->tmpBroadcastAddr));
if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR,
"[Type:%d] [TID:%lu] udp error!sendto failed=%d!receive message:%s",
pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
pWemqHeader->cStrJsonBody);
}
}
}
}
else if (strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0) // add subscribe response
{
if (serRet != 0)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] add listen return failed, iRet=%d, errmsg=%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, serRet, cMsg);
}
pthread_mutex_lock (&pContextProxy->regMutex);
if (pContextProxy->iFlagForReg == 0)
{
pContextProxy->iFlagForReg = 1;
pContextProxy->iResultForReg = serRet;
pthread_cond_signal (&pContextProxy->regCond);
}
pthread_mutex_unlock (&pContextProxy->regMutex);
}
else if (strcmp (usCmd, LISTEN_RESPONSE) == 0) // add listen start response
{
if (serRet == RMB_CODE_OTHER_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] send start to access failed,iRet=%d, errmsg=%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, serRet, cMsg);
}
if (serRet == RMB_CODE_AUT_FAIL)
{
LOGRMB (RMB_LOG_ERROR,
"[%s] [Type:%d] [TID:%lu] send start to access authentication failed,iRet=%d, errmsg=%s",
STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
pThreadCtx->m_threadID, serRet, cMsg);
}
pthread_mutex_lock (&pContextProxy->regMutex);
if (pContextProxy->iFlagForReg == 0)
{
pContextProxy->iFlagForReg = 1;
pContextProxy->iResultForReg = serRet;
pthread_cond_signal (&pContextProxy->regCond);
}
pthread_mutex_unlock (&pContextProxy->regMutex);
}
else
{
LOGRMB (RMB_LOG_ERROR, "[%s] [Type:%d] [TID:%lu] No Such Command:%s!",
STATE_MAP[pThreadCtx->m_iState],
pThreadCtx->m_contextType, pThreadCtx->m_threadID, usCmd);
ret = -1;
}
json_object_put (jsonHeader);
return ret;
}