void TcrEndpoint::receiveNotification()

in cppcache/src/TcrEndpoint.cpp [550:694]


void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
  LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
  while (isRunning) {
    try {
      size_t dataLen;
      ConnErrType opErr = CONN_NOERR;
      auto data = m_notifyConnection->receive(&dataLen, &opErr,
                                              std::chrono::seconds(5));

      if (opErr == CONN_IOERR) {
        // Endpoint is disconnected, this exception is expected
        LOGFINER(
            "IO exception while receiving subscription event for endpoint %d",
            opErr);
        if (isRunning) {
          setConnectionStatus(false);
          // close notification channel
          std::lock_guard<decltype(m_notifyReceiverLock)> guard(
              m_notifyReceiverLock);
          if (m_numRegionListener > 0) {
            m_numRegionListener = 0;
            closeNotification();
          }
        }
        break;
      }

      if (data) {
        TcrMessageReply msg(true, m_baseDM);
        msg.initCqMap();
        msg.setData(data, static_cast<int32_t>(dataLen),
                    getDistributedMemberID(),
                    *(m_cacheImpl->getSerializationRegistry()),
                    *(m_cacheImpl->getMemberListForVersionStamp()));
        handleNotificationStats(static_cast<int64_t>(dataLen));
        LOGDEBUG("receive notification %d", msg.getMessageType());

        if (!isRunning) {
          break;
        }

        if (msg.getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
          LOGFINE("Received ping from server subscription channel.");
        }

        // ignore some message types like REGISTER_INSTANTIATORS
        if (msg.shouldIgnore()) {
          continue;
        }

        bool isMarker = (msg.getMessageType() == TcrMessage::CLIENT_MARKER);
        if (!msg.hasCqPart()) {
          if (msg.getMessageType() != TcrMessage::CLIENT_MARKER) {
            const std::string& regionFullPath1 = msg.getRegionName();
            auto region1 = m_cacheImpl->getRegion(regionFullPath1);

            if (region1 != nullptr &&
                !static_cast<ThinClientRegion*>(region1.get())
                     ->getDistMgr()
                     ->isEndpointAttached(this)) {
              // drop event before even processing the eventid for duplicate
              // checking
              LOGFINER("Endpoint %s dropping event for region %s",
                       m_name.c_str(), regionFullPath1.c_str());
              continue;
            }
          }
        }

        if (!checkDupAndAdd(msg.getEventId())) {
          m_dupCount++;
          if (m_dupCount % 100 == 1) {
            LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
          }
          continue;
        }

        if (isMarker) {
          LOGFINE("Got a marker message on endpont %s", m_name.c_str());
          m_cacheImpl->processMarker();
          processMarker();
        } else {
          if (!msg.hasCqPart())  // || msg.isInterestListPassed())
          {
            const std::string& regionFullPath = msg.getRegionName();
            auto region = m_cacheImpl->getRegion(regionFullPath);

            if (region != nullptr) {
              static_cast<ThinClientRegion*>(region.get())
                  ->receiveNotification(msg);
            } else {
              LOGWARN(
                  "Notification for region %s that does not exist in "
                  "client cacheImpl.",
                  regionFullPath.c_str());
            }
          } else {
            LOGDEBUG("receive cq notification %d", msg.getMessageType());
            auto queryService = getQueryService();
            if (queryService != nullptr) {
              static_cast<RemoteQueryService*>(queryService.get())
                  ->receiveNotification(msg);
            }
          }
        }
      }
    } catch (const TimeoutException&) {
      // If there is no notification, this exception is expected
      // But this is valid only when *no* data has been received
      // otherwise if data has been read then TcrConnection will throw
      // a GeodeIOException which will cause the channel to close.
      LOGDEBUG(
          "receiveNotification timed out: no data received from "
          "endpoint %s",
          m_name.c_str());
    } catch (const GeodeIOException& e) {
      // Endpoint is disconnected, this exception is expected
      LOGFINER(
          "IO exception while receiving subscription event for endpoint %s: %s",
          m_name.c_str(), e.what());
      if (connected_) {
        setConnectionStatus(false);
        // close notification channel
        std::lock_guard<decltype(m_notifyReceiverLock)> guard(
            m_notifyReceiverLock);
        if (m_numRegionListener > 0) {
          m_numRegionListener = 0;
          closeNotification();
        }
      }
      break;
    } catch (const Exception& ex) {
      LOGERROR(
          "Exception while receiving subscription event for endpoint %s:: %s: "
          "%s",
          m_name.c_str(), ex.getName().c_str(), ex.what());
    } catch (...) {
      LOGERROR(
          "Unexpected exception while "
          "receiving subscription event from endpoint %s",
          m_name.c_str());
    }
  }
  LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str());
}