static int32_t _wemq_thread_on_message()

in eventmesh-sdks/eventmesh-sdk-c/src/wemq_thread.c [1148:2255]


static int32_t _wemq_thread_on_message (WemqThreadCtx * pThreadCtx,
                                        bool isRecvNewConnect)
{
  ASSERT (pThreadCtx);

  stContextProxy *pContextProxy = pThreadCtx->m_ptProxyContext;
  StWeMQMSG *pWemqHeader = &pThreadCtx->m_stWeMQMSG;
  WEMQJSON *jsonHeader = NULL;
  WEMQJSON *jsonTmp = NULL;
  const char *usCmd;
  int serRet = -1;
  int bodyLen = 0;
  int seq = -1;
  long time = 0;
  int ret = 0;
  char cMsg[RMB_MAX_ERR_MSG_FROM_ACCESS];

  LOGRMB (RMB_LOG_DEBUG,
          "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] |access2wemq_thread|Recv  header:%s  body:%s",
          STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
          pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
          pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort,
          pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);

  bodyLen = pWemqHeader->uiTotalLen - pWemqHeader->uiHeaderLen - 8;
  jsonHeader = json_tokener_parse (pWemqHeader->cStrJsonHeader);
  if (jsonHeader == NULL)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[Type:%d] [TID:%lu] json_tokener_parse header error: %s",
            pThreadCtx->m_contextType, pThreadCtx->m_threadID,
            pWemqHeader->cStrJsonHeader) return -1;
  }
  //get command
  json_object_object_get_ex (jsonHeader, MSG_HEAD_COMMAND_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    usCmd = json_object_get_string (jsonTmp);
  }
  //get code
  json_object_object_get_ex (jsonHeader, MSG_HEAD_CODE_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    serRet = json_object_get_int (jsonTmp);
  }
  //get seq
  json_object_object_get_ex (jsonHeader, MSG_HEAD_SEQ_INT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    seq = json_object_get_int (jsonTmp);
  }
  //get time
  json_object_object_get_ex (jsonHeader, MSG_HEAD_TIME_LINT, &jsonTmp);
  if (jsonTmp != NULL)
  {
    time = (long) json_object_get_int64 (jsonTmp);
  }
  memset (cMsg, 0x00, sizeof (cMsg));
  json_object_object_get_ex (jsonHeader, MSG_HEAD_MSG_STR, &jsonTmp);
  if (jsonTmp != NULL)
  {
    strncpy (cMsg, json_object_get_string (jsonTmp), sizeof (cMsg) - 1);
  }
  if (strcmp (cMsg, "auth exception") == 0)
  {                             // auth failed
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Authentication error!",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
    return -1;
  }
  if (serRet != RMB_CODE_SUSS)
  {
    LOGRMB (RMB_LOG_ERROR,
            "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Time:%ld] code from access !=0, cmd=%s, seq=%d, code=%d, msg=%s!",
            STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
            pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
            pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, time, usCmd,
            seq, serRet, cMsg);
  }
  if (strcmp (usCmd, HEARTBEAT_RESPONSE) == 0)  //心跳
  {
    gettimeofday (&pThreadCtx->stTimeLastRecv, NULL);
    pThreadCtx->m_uiHeartBeatCurrent = 0;

  }
  else if (strcmp (usCmd, HELLO_RESPONSE) == 0)
  {
    ret = serRet;

  }
  else if (strcmp (usCmd, CLIENT_GOODBYE_RESPONSE) == 0)
  {
    ret = WEMQ_MESSAGE_RET_CLIENTGOODBYE;

  }
  else if (strcmp (usCmd, SERVER_GOODBYE_REQUEST) == 0)
  {
    ret = WEMQ_MESSAGE_RET_SERVERGOODBYE;

  }
  else if (strcmp (usCmd, REDIRECT_TO_CLIENT) == 0)
  {
    ret =
      _wemq_thread_on_message_redirect (pThreadCtx,
                                        pWemqHeader->cStrJsonBody);

  }
  else if (strcmp (usCmd, ASYNC_MESSAGE_TO_SERVER_ACK) == 0)    //ack 单播 from access
  {
    if (serRet == RMB_CODE_SUSS)
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request success!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

      LOGRMB (RMB_LOG_DEBUG, "seq for event: %d",
              pContextProxy->iSeqForEvent);
      pthread_mutex_lock (&pContextProxy->eventMutex);
      if (seq == pContextProxy->iSeqForEvent)
      {
        pContextProxy->iFlagForEvent = serRet;
        pthread_cond_signal (&pContextProxy->eventCond);
      }
      pthread_mutex_unlock (&pContextProxy->eventMutex);
    }
    else if (serRet == RMB_CODE_AUT_FAIL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request Authentication fail!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

      pthread_mutex_lock (&pContextProxy->eventMutex);
      pContextProxy->iFlagForEvent = serRet;
      pthread_cond_signal (&pContextProxy->eventCond);
      pthread_mutex_unlock (&pContextProxy->eventMutex);

    }
    else
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] PublishAsyncMessage request fail!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

      pthread_mutex_lock (&pContextProxy->eventMutex);
      pContextProxy->iFlagForEvent = serRet;
      pthread_cond_signal (&pContextProxy->eventCond);
      pthread_mutex_unlock (&pContextProxy->eventMutex);

    }
    if (pContextProxy->iFlagForEvent == -1)
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld]  send event msg signal succ! iFlagForEvent=%d",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
              serRet);

    }

  }
  else if (strcmp (usCmd, ASYNC_MESSAGE_TO_CLIENT) == 0)        //recv event message from proxy
  {
    if (serRet == 0)
    {
      StContext *pStContext;
      if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
      {
        pStContext = (StContext *) (pContextProxy->pubContext);
      }
      else
      {
        pStContext = (StContext *) (pContextProxy->subContext);
      }

      if (pStContext == NULL)
      {
        LOGRMB (RMB_LOG_ERROR, "pStContext is null");

      }

      int iRet = 0;
      if ((iRet =
           trans_json_2_rmb_msg (pStContext->pReceiveWemqMsg,
                                 pWemqHeader->cStrJsonBody,
                                 ASYNC_MESSAGE_TO_CLIENT)) != 0)
      {
        LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
                pWemqHeader->cStrJsonBody);
        _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                     "trans_json_2_rmb_msg failed",
                                     pStContext->pReceiveWemqMsg);
        return -1;
      }
      pStContext->pReceiveWemqMsg->iMsgMode = RMB_MSG_FROM_WEMQ;
      pStContext->pReceiveWemqMsg->cPkgType = QUEUE_PKG;

      pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;

      set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsg,
                               ASYNC_MESSAGE_TO_CLIENT, seq);

      iRet =
        shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
                         pStContext->pReceiveWemqMsg);
      //LOGRMB(RMB_LOG_DEBUG, "appHeaderLen:%d", pStContext->pReceiveWemqMsg->iAppHeaderLen);

      //LOGRMB(RMB_LOG_DEBUG, "uiWemqPkgLen:%d", pStContext->uiWemqPkgLen);
      if (iRet < 0)
      {
        LOGRMB (RMB_LOG_ERROR,
                "shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
                iRet, pStContext->pReceiveWemqMsg->iEventOrService,
                pStContext->pReceiveWemqMsg->strTargetDcn,
                pStContext->pReceiveWemqMsg->strServiceId,
                pStContext->pReceiveWemqMsg->strScenarioId,
                pStContext->pReceiveWemqMsg->sysHeader.cUniqueId,
                pStContext->pReceiveWemqMsg->iMsgMode,
                rmb_msg_print (pStContext->pReceiveWemqMsg));

      }
      //染色消息直接返回
      if (check_dyed_msg (pStContext->pReceiveWemqMsg) > 0)
      {
        _wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
                                             ASYNC_MESSAGE_TO_CLIENT_ACK,
                                             pStContext->pReceiveWemqMsg);
        LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
        return serRet;
      }
      int iMqIndex = req_mq_index;
      LOGRMB (RMB_LOG_DEBUG, "WEMQ_CMD_ASYNCEVENT_SUB:Ready to enqueue");

      if (pRmbStConfig->iFlagForReq == (int) MSG_IPC_MQ)
      {
        //if (bodyLen >= pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize - C_RMB_MQ_PKG_HEAD_SIZE)
        if (pStContext->uiWemqPkgLen >=
            pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
            C_RMB_MQ_PKG_HEAD_SIZE)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                  pStContext->uiWemqPkgLen,
                  pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
          _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                       "receive reqSize bigger than shmSize",
                                       pStContext->pReceiveWemqMsg);
        }
        else
        {
          while ((iRet =
                  rmb_context_enqueue (pStContext,
                                       (const enum RmbMqIndex) iMqIndex,
                                       pStContext->pWemqPkg,
                                       pStContext->uiWemqPkgLen)) == -2)
          {
            // LOG
            LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] req queue full!wait!",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID);
            usleep (1000);
          }
          if (iRet != 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                    pWemqHeader->cStrJsonBody);
            _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                         "req wemq_context_enqueue error!enqueue failed",
                                         pStContext->pReceiveWemqMsg);
            return 0;
          }
          else
          {
            LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] Enqueue succ, msg %s",
                    pThreadCtx->m_contextType,
                    pThreadCtx->m_threadID, pWemqHeader->cStrJsonBody);
          }
        }
      }
      else
      {
        iRet =
          sendto (pStContext->iSocketForReq, pStContext->pWemqPkg,
                  pStContext->uiWemqPkgLen, 0,
                  (const struct sockaddr *) &pStContext->tmpReqAddr,
                  sizeof (pStContext->tmpReqAddr));
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                  pWemqHeader->cStrJsonBody);
        }
      }
    }
  }

  else if (strcmp (usCmd, REQUEST_TO_CLIENT) == 0)      //sub端收到RR请求消息
  {
    if (serRet == 0)
    {
      StContext *pStContext;
      if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
      {
        pStContext = (StContext *) (pContextProxy->pubContext);
      }
      else
      {
        pStContext = (StContext *) (pContextProxy->subContext);
      }

      if (pStContext == NULL)
      {
        LOGRMB (RMB_LOG_ERROR, "pStContext is null");
      }

      int iRet = 0;
      if ((iRet =
           trans_json_2_rmb_msg (pStContext->pReceiveWemqMsg,
                                 pWemqHeader->cStrJsonBody,
                                 REQUEST_TO_CLIENT)) != 0)
      {
        LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
                pWemqHeader->cStrJsonBody);
        _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                     "trans_json_2_rmb_msg failed",
                                     pStContext->pReceiveWemqMsg);
        return -1;
      }

      pStContext->pReceiveWemqMsg->iMsgMode = RMB_MSG_FROM_WEMQ;
      pStContext->pReceiveWemqMsg->cPkgType = QUEUE_PKG;

      pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;

      set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsg, REQUEST_TO_CLIENT,
                               seq);

      iRet =
        shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
                         pStContext->pReceiveWemqMsg);
      //LOGRMB(RMB_LOG_DEBUG, "uiWemqPkgLen:%d", pStContext->uiWemqPkgLen);

      if (iRet < 0)
      {
        LOGRMB (RMB_LOG_ERROR,
                "shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
                iRet, pStContext->pReceiveWemqMsg->iEventOrService,
                pStContext->pReceiveWemqMsg->strTargetDcn,
                pStContext->pReceiveWemqMsg->strServiceId,
                pStContext->pReceiveWemqMsg->strScenarioId,
                pStContext->pReceiveWemqMsg->sysHeader.cUniqueId,
                pStContext->pReceiveWemqMsg->iMsgMode,
                rmb_msg_print (pStContext->pReceiveWemqMsg));
        _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                     "shift_msg_2_buf failed",
                                     pStContext->pReceiveWemqMsg);
        return -1;
      }

      int iMqIndex = req_mq_index;
      //染色消息直接返回
      if (check_dyed_msg (pStContext->pReceiveWemqMsg) > 0)
      {
        LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
        _wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
                                             REQUEST_TO_CLIENT_ACK,
                                             pStContext->pReceiveWemqMsg);
        _wemq_thread_dyed_msg_reply_to_access (pThreadCtx, seq, 0,
                                               RESPONSE_TO_SERVER,
                                               pStContext->pReceiveWemqMsg);
        return serRet;
      }
      LOGRMB (RMB_LOG_DEBUG, "WEMQ_CMD_SYNCREQ:Ready to en queue");

      if (pRmbStConfig->iFlagForReq == (int) MSG_IPC_MQ)
      {
        //if (bodyLen >= pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize - C_RMB_MQ_PKG_HEAD_SIZE)
        if (pStContext->uiWemqPkgLen >=
            pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
            C_RMB_MQ_PKG_HEAD_SIZE)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                  pStContext->uiWemqPkgLen,
                  pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
          _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                       "receive reqSize bigger than shmSize",
                                       pStContext->pReceiveWemqMsg);
        }
        else
        {
          while ((iRet =
                  rmb_context_enqueue (pStContext,
                                       (const enum RmbMqIndex) iMqIndex,
                                       pStContext->pWemqPkg,
                                       pStContext->uiWemqPkgLen)) == -2)
          {
            // LOG
            LOGRMB (RMB_LOG_DEBUG, "[Type:%d] [TID:%lu] req queue full!wait!",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID);
            usleep (1000);
          }
          if (iRet != 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                    pWemqHeader->cStrJsonBody);
            _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                         "req wemq_context_enqueue error!enqueue failed",
                                         pStContext->pReceiveWemqMsg);
          }
          else
          {
            LOGRMB (RMB_LOG_DEBUG,
                    "[Type:%d] [TID:%lu] Enqueue succ, msg: header  %s  body %s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
          }
        }
      }
      else
      {
        iRet =
          sendto (pStContext->iSocketForReq, pStContext->pWemqPkg,
                  pStContext->uiWemqPkgLen, 0,
                  (const struct sockaddr *) &pStContext->tmpReqAddr,
                  sizeof (pStContext->tmpReqAddr));
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                  pWemqHeader->cStrJsonBody);
        }
      }
    }
  }
  else if (strcmp (usCmd, RESPONSE_TO_CLIENT) == 0)     //rr请求端收到的回包
  {
    if (serRet == 0)            //rsp succ
    {
      WEMQJSON *jsonByteBody = NULL;
      WEMQJSON *systemHeader = NULL;
      WEMQJSON *sysExtFields = NULL;
      WEMQJSON *jsonDecoder = NULL;
      int rrType = 0;
      WEMQJSON *jsonBody = json_tokener_parse (pWemqHeader->cStrJsonBody);
      if (jsonBody == NULL)
      {
        LOGRMB (RMB_LOG_ERROR, "json_tokener_parse failed!,buf is:%s",
                pWemqHeader->cStrJsonBody);
        return -1;
      }
      if (!json_object_object_get_ex
          (jsonBody, MSG_BODY_BYTE_BODY_JSON, &jsonByteBody))
      {
        LOGRMB (RMB_LOG_ERROR, "body json no byte body!");
        json_object_put (jsonBody);
        return -1;
      }                         //byte body json

      jsonByteBody =
        json_tokener_parse (json_object_get_string (jsonByteBody));
      if (NULL == jsonByteBody
          || !json_object_object_get_ex (jsonByteBody,
                                         MSG_BODY_BYTE_BODY_SYSTEM_HEADER_CONTENT_JSON,
                                         &systemHeader))
      {
        LOGRMB (RMB_LOG_ERROR, "byte body json no system header content!");
        json_object_put (jsonBody);
        if (NULL != jsonByteBody)
        {
          json_object_put (jsonByteBody);
        }
        return -1;
      }                         //system header json

      systemHeader =
        json_tokener_parse (json_object_get_string (systemHeader));
      if (json_object_object_get_ex
          (systemHeader, MSG_BODY_SYSTEM_EXTFIELDS_STR, &sysExtFields))
      {
        sysExtFields =
          json_tokener_parse (json_object_get_string (sysExtFields));
        if (json_object_object_get_ex
            (sysExtFields, MSG_BODY_SYSTEM_RRTYPE_INT, &jsonDecoder))
        {
          rrType = json_object_get_int (jsonDecoder);
        }
      }                         //system extFields RRtype
      if (0 == rrType)
      {
        if (bodyLen >= (TCP_BUF_SIZE * sizeof (char)))
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] get rsp too long,bodyLen=%d,buf_le=%d",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq,
                  time, bodyLen, TCP_BUF_SIZE);
        }
        int i = 0;
        int recvFlag = 0;
        pthread_mutex_lock (&pContextProxy->rrMutex);
        if (pContextProxy->iFlagForRR == -1)
        {
          LOGRMB (RMB_LOG_DEBUG,
                  "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] signal succ! iFlagForRR=0",
                  STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
                  pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
                  pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq,
                  time);
          memcpy (pContextProxy->mPubRRBuf, pWemqHeader->cStrJsonBody,
                  bodyLen);
          pContextProxy->mPubRRBuf[bodyLen] = '\0';
          trans_json_2_rmb_msg (pContextProxy->pReplyMsg,
                                pContextProxy->mPubRRBuf, RESPONSE_TO_CLIENT);
          //LOGRMB(RMB_LOG_DEBUG, "destname :%s", pContextProxy->pReplyMsg->dest.cDestName);
          GetRmbNowLongTime ();
          pContextProxy->pReplyMsg->sysHeader.ulReplyReceiveTime =
            pRmbStConfig->ulNowTtime;
          set_extfields_2_rmb_msg (pContextProxy->pReplyMsg,
                                   RESPONSE_TO_CLIENT, seq);
          if (pContextProxy->stUnique.flag == 1
              && strcmp (pContextProxy->stUnique.unique_id,
                         pContextProxy->pReplyMsg->sysHeader.cUniqueId) == 0)
          {
            if (check_dyed_msg (pContextProxy->pReplyMsg) > 0)
            {
              LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s",
                      pContextProxy->mPubRRBuf);
              pContextProxy->iFlagForRR = RMB_CODE_DYED_MSG;
            }
            else
            {
              pContextProxy->iFlagForRR = serRet;
            }
            pContextProxy->stUnique.flag = 0;
            pthread_cond_signal (&pContextProxy->rrCond);
            recvFlag = 1;

          }
        }
        pthread_mutex_unlock (&pContextProxy->rrMutex);
        if (recvFlag == 1)
        {
          _wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
                                           RESPONSE_TO_CLIENT_ACK,
                                           pContextProxy->pReplyMsg);
          //_wemq_thread_send_error_log(pThreadCtx, seq, 0, "ok",  pContextProxy->pReplyMsg);
        }
      }
      else                      //case WEMQ_CMD_ASYNCRSP RR异步消息,服务请求方收到回包
      {
        StContext *pStContext = (StContext *) (pContextProxy->subContext);
        pContextProxy->iFlagForRRAsync = 0;
        if (pStContext == NULL)
        {
          LOGRMB (RMB_LOG_ERROR, "[TID:%lu] pStContext is null",
                  pThreadCtx->m_threadID);
          json_object_put (jsonBody);
          json_object_put (jsonByteBody);
          if (NULL != systemHeader)
          {
            json_object_put (systemHeader);
          }
          if (NULL != sysExtFields)
          {
            json_object_put (sysExtFields);
          }
          return -1;
        }

        int iRet = 0;

        if ((iRet =
             trans_json_2_rmb_msg (pStContext->pReceiveWemqMsgForRR,
                                   pWemqHeader->cStrJsonBody,
                                   RESPONSE_TO_CLIENT)) != 0)
        {
          LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
                  pWemqHeader->cStrJsonBody);
          _wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
                                           RESPONSE_TO_CLIENT_ACK,
                                           pStContext->pReceiveWemqMsgForRR);
          _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                       "trans_json_2_rmb_msg failed",
                                       pStContext->pReceiveWemqMsgForRR);
          json_object_put (jsonBody);
          json_object_put (jsonByteBody);
          if (NULL != systemHeader)
          {
            json_object_put (systemHeader);
          }
          if (NULL != sysExtFields)
          {
            json_object_put (sysExtFields);
          }
          return -1;
        }
        GetRmbNowLongTime ();
        pStContext->pReceiveWemqMsgForRR->sysHeader.ulReplyReceiveTime =
          pRmbStConfig->ulNowTtime;
        int i;
        set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsgForRR,
                                 RESPONSE_TO_CLIENT, seq);
        int recvFlag = 0;
        pthread_mutex_lock (&pContextProxy->rrMutex);
        //因为在access goodbye的时候,消息可能在旧连接上,所以必须同时扫描新旧连接,不能只扫描新连接

        //if(isRecvNewConnect)
        //{
        for (i = 0;
             i <
             pContextProxy->pUniqueListForRRAsyncNew.
             get_array_size (&pContextProxy->pUniqueListForRRAsyncNew); i++)
        {
          if (pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag == 1
              && strcmp (pContextProxy->pUniqueListForRRAsyncNew.Data[i].
                         unique_id,
                         pStContext->pReceiveWemqMsgForRR->sysHeader.
                         cUniqueId) == 0)
          {
            pContextProxy->pUniqueListForRRAsyncNew.Data[i].flag = 0;
            recvFlag = 1;
            break;
          }
        }
        //              }else
        //      {
        for (i = 0;
             i <
             pContextProxy->pUniqueListForRRAsyncOld.
             get_array_size (&pContextProxy->pUniqueListForRRAsyncOld); i++)
        {
          if (pContextProxy->pUniqueListForRRAsyncOld.Data[i].flag == 1
              && strcmp (pContextProxy->pUniqueListForRRAsyncOld.Data[i].
                         unique_id,
                         pStContext->pReceiveWemqMsgForRR->sysHeader.
                         cUniqueId) == 0)
          {
            pContextProxy->pUniqueListForRRAsyncOld.Data[i].flag = 0;
            recvFlag = 1;
            break;
          }
        }
        pthread_mutex_unlock (&pContextProxy->rrMutex);
        //}
        //已经超时,不回包给pub端
        if (recvFlag == 0)
        {
          LOGRMB (RMB_LOG_WARN,
                  "rr async response bizseq=%s, uniqueId=%s comes back, but request has timeout",
                  pStContext->pReceiveWemqMsgForRR->sysHeader.cBizSeqNo,
                  pStContext->pReceiveWemqMsgForRR->sysHeader.cUniqueId);
          json_object_put (jsonBody);
          json_object_put (jsonByteBody);
          if (NULL != systemHeader)
          {
            json_object_put (systemHeader);
          }
          if (NULL != sysExtFields)
          {
            json_object_put (sysExtFields);
          }
          return -1;
        }

        pStContext->pReceiveWemqMsgForRR->cPkgType = RR_TOPIC_PKG;
        pStContext->pReceiveWemqMsgForRR->iMsgMode = RMB_MSG_FROM_WEMQ;
        pStContext->uiWemqPkgForRRAsyncLen = MAX_LENTH_IN_A_MSG;
        iRet =
          shift_msg_2_buf (pStContext->pWemqPkgForRRAsync,
                           &pStContext->uiWemqPkgForRRAsyncLen,
                           pStContext->pReceiveWemqMsgForRR);
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
                  iRet, pStContext->pReceiveWemqMsgForRR->iEventOrService,
                  pStContext->pReceiveWemqMsgForRR->strTargetDcn,
                  pStContext->pReceiveWemqMsgForRR->strServiceId,
                  pStContext->pReceiveWemqMsgForRR->strScenarioId,
                  pStContext->pReceiveWemqMsgForRR->sysHeader.cUniqueId,
                  pStContext->pReceiveWemqMsgForRR->iMsgMode,
                  rmb_msg_print (pStContext->pReceiveWemqMsgForRR));
          _wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
                                           RESPONSE_TO_CLIENT_ACK,
                                           pStContext->pReceiveWemqMsgForRR);
          _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                       "shift_msg_2_buf error",
                                       pStContext->pReceiveWemqMsgForRR);
          json_object_put (jsonBody);
          json_object_put (jsonByteBody);
          if (NULL != systemHeader)
          {
            json_object_put (systemHeader);
          }
          if (NULL != sysExtFields)
          {
            json_object_put (sysExtFields);
          }
          return -1;
        }
        if (check_dyed_msg (pStContext->pReceiveWemqMsgForRR) > 0)
        {
          LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s",
                  pWemqHeader->cStrJsonBody);
          _wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
                                           RESPONSE_TO_CLIENT_ACK,
                                           pStContext->pReceiveWemqMsgForRR);
          json_object_put (jsonBody);
          json_object_put (jsonByteBody);
          if (NULL != systemHeader)
          {
            json_object_put (systemHeader);
          }
          if (NULL != sysExtFields)
          {
            json_object_put (sysExtFields);
          }
          return serRet;
        }

        int iMqIndex = rr_rsp_mq_index;
        LOGRMB (RMB_LOG_DEBUG,
                "WEMQ_CMD_ASYNCRSP:Recv rr rsp Ready to en queue");
        if (pRmbStConfig->iFlagForRRrsp == (int) MSG_IPC_MQ)
        {
          if (pStContext->uiWemqPkgForRRAsyncLen >=
              pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
              C_RMB_MQ_PKG_HEAD_SIZE)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[Type:%d] [TID:%lu] receive reqSize=%u bigger than shmSize=%u,discard",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pStContext->uiWemqPkgForRRAsyncLen,
                    pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
            _wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
                                             RESPONSE_TO_CLIENT_ACK,
                                             pStContext->
                                             pReceiveWemqMsgForRR);
            _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                         "receive msg bigger than shmSize",
                                         pStContext->pReceiveWemqMsgForRR);
          }
          else
          {
            while ((iRet =
                    rmb_context_enqueue (pStContext,
                                         (const enum RmbMqIndex) iMqIndex,
                                         pStContext->pWemqPkgForRRAsync,
                                         pStContext->
                                         uiWemqPkgForRRAsyncLen)) == -2)
            {
              // LOG
              LOGRMB (RMB_LOG_DEBUG,
                      "[Type:%d] [TID:%lu] req queue full!wait!",
                      pThreadCtx->m_contextType, pThreadCtx->m_threadID);
              usleep (1000);
            }
            if (iRet != 0)
            {
              LOGRMB (RMB_LOG_ERROR,
                      "[Type:%d] [TID:%lu] req wemq_context_enqueue error!enqueue failed=%d!receive=%s",
                      pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                      pWemqHeader->cStrJsonBody);
              _wemq_thread_resp_ack_to_access (pThreadCtx, seq, -1,
                                               RESPONSE_TO_CLIENT_ACK,
                                               pStContext->
                                               pReceiveWemqMsgForRR);
              _wemq_thread_send_error_log (pThreadCtx, seq, -1,
                                           LOG_ERROR_POINT,
                                           "req wemq_context_enqueue error",
                                           pStContext->pReceiveWemqMsgForRR);
            }
            else
            {
              LOGRMB (RMB_LOG_DEBUG,
                      "[Type:%d] [TID:%lu] Enqueue succ, msg: header  %s  body %s",
                      pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                      pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
              _wemq_thread_resp_ack_to_access (pThreadCtx, seq, 0,
                                               RESPONSE_TO_CLIENT_ACK,
                                               pStContext->
                                               pReceiveWemqMsgForRR);
            }
          }
        }
        else
        {
          iRet =
            sendto (pStContext->iSocketForRsp, pStContext->pWemqPkgForRRAsync,
                    pStContext->uiWemqPkgForRRAsyncLen, 0,
                    (const struct sockaddr *) &pStContext->tmpReplyAddr,
                    sizeof (pStContext->tmpReplyAddr));
          if (iRet < 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[Type:%d] [TID:%lu] sendto failed=%d,message is:%s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                    pWemqHeader->cStrJsonBody);
          }
        }
      }
      json_object_put (jsonByteBody);
      json_object_put (jsonBody);
      if (NULL != systemHeader)
      {
        json_object_put (systemHeader);
      }
      if (NULL != sysExtFields)
      {
        json_object_put (sysExtFields);
      }
    }
    else if (serRet == RMB_CODE_AUT_FAIL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] rr request ret Authentication fail",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

      pthread_mutex_lock (&pContextProxy->rrMutex);
      pContextProxy->iFlagForRR = serRet;
      pContextProxy->stUnique.flag = 0;
      pthread_cond_signal (&pContextProxy->rrCond);
      pthread_mutex_unlock (&pContextProxy->rrMutex);
    }
    else
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] rr request ret fail",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

      pthread_mutex_lock (&pContextProxy->rrMutex);
      pContextProxy->iFlagForRR = serRet;
      pContextProxy->stUnique.flag = 0;
      pthread_cond_signal (&pContextProxy->rrCond);
      pthread_mutex_unlock (&pContextProxy->rrMutex);
    }
  }
  else if (strcmp (usCmd, BROADCAST_MESSAGE_TO_SERVER_ACK) == 0)        //收到广播消息的ack
  {
    if (serRet == RMB_CODE_SUSS)
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request success",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

    }
    else if (serRet == RMB_CODE_AUT_FAIL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request Authentication fail!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);
    }
    else
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld] Publish Broadcast Message request failed!",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time);

    }

    pthread_mutex_lock (&pContextProxy->eventMutex);
    if (pContextProxy->iFlagForEvent == -1)
    {
      LOGRMB (RMB_LOG_DEBUG,
              "[%s] [Type:%d] [TID:%lu] [LocalPort:%d] [proxy ip:%s|port:%u] [Seq:%d] [Time:%ld]  send Broadcast msg signal succ! iFlagForEvent=%d",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, pThreadCtx->m_iLocalPort,
              pThreadCtx->m_cProxyIP, pThreadCtx->m_uiProxyPort, seq, time,
              serRet);
      LOGRMB (RMB_LOG_DEBUG, "seq for Broadcast: %d",
              pContextProxy->iSeqForEvent);
      if (seq == pContextProxy->iSeqForEvent)
      {
        pContextProxy->iFlagForEvent = serRet;
        pthread_cond_signal (&pContextProxy->eventCond);
      }
    }
    pthread_mutex_unlock (&pContextProxy->eventMutex);
  }

  else if (strcmp (usCmd, BROADCAST_MESSAGE_TO_CLIENT) == 0)    //收到广播消息
  {
    if (serRet == 0)
    {
      StContext *pStContext = NULL;
      if (pThreadCtx->m_contextType == RMB_CONTEXT_TYPE_PUB)
      {
        pStContext = (StContext *) (pContextProxy->pubContext);
      }
      else
      {
        pStContext = (StContext *) (pContextProxy->subContext);
      }

      if (pStContext == NULL)
      {
        LOGRMB (RMB_LOG_ERROR, "[TID:%lu] pStContext is null",
                pThreadCtx->m_threadID);
        return -1;
      }

      int iRet = 0;

      if ((iRet =
           trans_json_2_rmb_msg (pStContext->pReceiveWemqMsgForBroadCast,
                                 pWemqHeader->cStrJsonBody,
                                 BROADCAST_MESSAGE_TO_CLIENT)) != 0)
      {
        LOGRMB (RMB_LOG_ERROR, "trans_json_2_rmb_msg failed,buf is:%s",
                pWemqHeader->cStrJsonBody);
        return -1;
      }

      pStContext->pReceiveWemqMsgForBroadCast->cPkgType = BROADCAST_TOPIC_PKG;
      pStContext->pReceiveWemqMsgForBroadCast->iMsgMode = RMB_MSG_FROM_WEMQ;
      pStContext->uiWemqPkgLen = MAX_LENTH_IN_A_MSG;

      set_extfields_2_rmb_msg (pStContext->pReceiveWemqMsgForBroadCast,
                               BROADCAST_MESSAGE_TO_CLIENT, seq);

      iRet =
        shift_msg_2_buf (pStContext->pWemqPkg, &pStContext->uiWemqPkgLen,
                         pStContext->pReceiveWemqMsgForBroadCast);
      if (iRet < 0)
      {
        LOGRMB (RMB_LOG_ERROR,
                "shift_msg_2_buf error!iRet=%d, %d, %s/%s/%s,unique_id=%s,mode=%d,receive=%s",
                iRet,
                pStContext->pReceiveWemqMsgForBroadCast->iEventOrService,
                pStContext->pReceiveWemqMsgForBroadCast->strTargetDcn,
                pStContext->pReceiveWemqMsgForBroadCast->strServiceId,
                pStContext->pReceiveWemqMsgForBroadCast->strScenarioId,
                pStContext->pReceiveWemqMsgForBroadCast->sysHeader.cUniqueId,
                pStContext->pReceiveWemqMsgForBroadCast->iMsgMode,
                rmb_msg_print (pStContext->pReceiveWemqMsgForBroadCast));
        _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                     "shift_msg_2_buf error",
                                     pStContext->pReceiveWemqMsgForBroadCast);

        return -1;
      }
      //染色消息直接返回
      if (check_dyed_msg (pStContext->pReceiveWemqMsgForBroadCast) > 0)
      {
        _wemq_thread_dyed_msg_ack_to_access (pThreadCtx, seq, 0,
                                             BROADCAST_MESSAGE_TO_CLIENT_ACK,
                                             pStContext->pReceiveWemqMsg);
        LOGRMB (RMB_LOG_DEBUG, "get dyed msg:%s", pWemqHeader->cStrJsonBody);
        return serRet;
      }

      int iMqIndex = broadcast_mq_index;
      LOGRMB (RMB_LOG_DEBUG,
              "WEMQ_CMD_BROADCAST_SUB:Recv broadcast message ready to enqueue");
      if (pRmbStConfig->iFlagForBroadCast == (int) MSG_IPC_MQ)
      {
        if (pStContext->uiWemqPkgLen >=
            pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize -
            C_RMB_MQ_PKG_HEAD_SIZE)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] receive broadcast message reqSize=%u bigger than shmSize=%u,discard",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                  pStContext->uiWemqPkgLen,
                  pStContext->fifoMq.mqIndex[iMqIndex]->mq->uiBlockSize);
          _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                       "receive broadcast message reqSize bigger than shmSize",
                                       pStContext->
                                       pReceiveWemqMsgForBroadCast);
        }
        else
        {
          while ((iRet =
                  rmb_context_enqueue (pStContext,
                                       (const enum RmbMqIndex) iMqIndex,
                                       pStContext->pWemqPkg,
                                       pStContext->uiWemqPkgLen)) == -2)
          {
            //queue full
            LOGRMB (RMB_LOG_ERROR, "[Type:%d] [TID:%lu] req queue full!wait!",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID);
            usleep (1000);
          }
          if (iRet != 0)
          {
            LOGRMB (RMB_LOG_ERROR,
                    "[Type:%d] [TID:%lu] req rmb_context_enqueue error!iRet=%d!receive=%s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                    pWemqHeader->cStrJsonBody);
            _wemq_thread_send_error_log (pThreadCtx, seq, -1, LOG_ERROR_POINT,
                                         "req rmb_context_enqueue error",
                                         pStContext->
                                         pReceiveWemqMsgForBroadCast);
          }
          else
          {
            LOGRMB (RMB_LOG_DEBUG,
                    "[Type:%d] [TID:%lu] enqueue succ, msg: header  %s  body %s",
                    pThreadCtx->m_contextType, pThreadCtx->m_threadID,
                    pWemqHeader->cStrJsonHeader, pWemqHeader->cStrJsonBody);
          }
        }
      }
      else
      {
        iRet =
          sendto (pStContext->iSocketForBroadcast, pStContext->pWemqPkg,
                  pStContext->uiWemqPkgLen, 0,
                  (const struct sockaddr *) &pStContext->tmpBroadcastAddr,
                  sizeof (pStContext->tmpBroadcastAddr));
        if (iRet < 0)
        {
          LOGRMB (RMB_LOG_ERROR,
                  "[Type:%d] [TID:%lu] udp error!sendto failed=%d!receive message:%s",
                  pThreadCtx->m_contextType, pThreadCtx->m_threadID, iRet,
                  pWemqHeader->cStrJsonBody);
        }
      }
    }
  }
  else if (strcmp (usCmd, SUBSCRIBE_RESPONSE) == 0)     // add subscribe response
  {
    if (serRet != 0)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] add listen return failed, iRet=%d, errmsg=%s",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, serRet, cMsg);
    }

    pthread_mutex_lock (&pContextProxy->regMutex);
    if (pContextProxy->iFlagForReg == 0)
    {
      pContextProxy->iFlagForReg = 1;
      pContextProxy->iResultForReg = serRet;
      pthread_cond_signal (&pContextProxy->regCond);
    }
    pthread_mutex_unlock (&pContextProxy->regMutex);
  }
  else if (strcmp (usCmd, LISTEN_RESPONSE) == 0)        // add listen start response
  {

    if (serRet == RMB_CODE_OTHER_FAIL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] send start to access failed,iRet=%d, errmsg=%s",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, serRet, cMsg);
    }

    if (serRet == RMB_CODE_AUT_FAIL)
    {
      LOGRMB (RMB_LOG_ERROR,
              "[%s] [Type:%d] [TID:%lu] send start to access authentication failed,iRet=%d, errmsg=%s",
              STATE_MAP[pThreadCtx->m_iState], pThreadCtx->m_contextType,
              pThreadCtx->m_threadID, serRet, cMsg);
    }

    pthread_mutex_lock (&pContextProxy->regMutex);
    if (pContextProxy->iFlagForReg == 0)
    {
      pContextProxy->iFlagForReg = 1;
      pContextProxy->iResultForReg = serRet;
      pthread_cond_signal (&pContextProxy->regCond);
    }
    pthread_mutex_unlock (&pContextProxy->regMutex);

  }

  else
  {
    LOGRMB (RMB_LOG_ERROR, "[%s] [Type:%d] [TID:%lu] No Such Command:%s!",
            STATE_MAP[pThreadCtx->m_iState],
            pThreadCtx->m_contextType, pThreadCtx->m_threadID, usCmd);
    ret = -1;
  }

  json_object_put (jsonHeader);

  return ret;
}