in cppcache/src/TcrEndpoint.cpp [550:694]
void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
while (isRunning) {
try {
size_t dataLen;
ConnErrType opErr = CONN_NOERR;
auto data = m_notifyConnection->receive(&dataLen, &opErr,
std::chrono::seconds(5));
if (opErr == CONN_IOERR) {
// Endpoint is disconnected, this exception is expected
LOGFINER(
"IO exception while receiving subscription event for endpoint %d",
opErr);
if (isRunning) {
setConnectionStatus(false);
// close notification channel
std::lock_guard<decltype(m_notifyReceiverLock)> guard(
m_notifyReceiverLock);
if (m_numRegionListener > 0) {
m_numRegionListener = 0;
closeNotification();
}
}
break;
}
if (data) {
TcrMessageReply msg(true, m_baseDM);
msg.initCqMap();
msg.setData(data, static_cast<int32_t>(dataLen),
getDistributedMemberID(),
*(m_cacheImpl->getSerializationRegistry()),
*(m_cacheImpl->getMemberListForVersionStamp()));
handleNotificationStats(static_cast<int64_t>(dataLen));
LOGDEBUG("receive notification %d", msg.getMessageType());
if (!isRunning) {
break;
}
if (msg.getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
LOGFINE("Received ping from server subscription channel.");
}
// ignore some message types like REGISTER_INSTANTIATORS
if (msg.shouldIgnore()) {
continue;
}
bool isMarker = (msg.getMessageType() == TcrMessage::CLIENT_MARKER);
if (!msg.hasCqPart()) {
if (msg.getMessageType() != TcrMessage::CLIENT_MARKER) {
const std::string& regionFullPath1 = msg.getRegionName();
auto region1 = m_cacheImpl->getRegion(regionFullPath1);
if (region1 != nullptr &&
!static_cast<ThinClientRegion*>(region1.get())
->getDistMgr()
->isEndpointAttached(this)) {
// drop event before even processing the eventid for duplicate
// checking
LOGFINER("Endpoint %s dropping event for region %s",
m_name.c_str(), regionFullPath1.c_str());
continue;
}
}
}
if (!checkDupAndAdd(msg.getEventId())) {
m_dupCount++;
if (m_dupCount % 100 == 1) {
LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
}
continue;
}
if (isMarker) {
LOGFINE("Got a marker message on endpont %s", m_name.c_str());
m_cacheImpl->processMarker();
processMarker();
} else {
if (!msg.hasCqPart()) // || msg.isInterestListPassed())
{
const std::string& regionFullPath = msg.getRegionName();
auto region = m_cacheImpl->getRegion(regionFullPath);
if (region != nullptr) {
static_cast<ThinClientRegion*>(region.get())
->receiveNotification(msg);
} else {
LOGWARN(
"Notification for region %s that does not exist in "
"client cacheImpl.",
regionFullPath.c_str());
}
} else {
LOGDEBUG("receive cq notification %d", msg.getMessageType());
auto queryService = getQueryService();
if (queryService != nullptr) {
static_cast<RemoteQueryService*>(queryService.get())
->receiveNotification(msg);
}
}
}
}
} catch (const TimeoutException&) {
// If there is no notification, this exception is expected
// But this is valid only when *no* data has been received
// otherwise if data has been read then TcrConnection will throw
// a GeodeIOException which will cause the channel to close.
LOGDEBUG(
"receiveNotification timed out: no data received from "
"endpoint %s",
m_name.c_str());
} catch (const GeodeIOException& e) {
// Endpoint is disconnected, this exception is expected
LOGFINER(
"IO exception while receiving subscription event for endpoint %s: %s",
m_name.c_str(), e.what());
if (connected_) {
setConnectionStatus(false);
// close notification channel
std::lock_guard<decltype(m_notifyReceiverLock)> guard(
m_notifyReceiverLock);
if (m_numRegionListener > 0) {
m_numRegionListener = 0;
closeNotification();
}
}
break;
} catch (const Exception& ex) {
LOGERROR(
"Exception while receiving subscription event for endpoint %s:: %s: "
"%s",
m_name.c_str(), ex.getName().c_str(), ex.what());
} catch (...) {
LOGERROR(
"Unexpected exception while "
"receiving subscription event from endpoint %s",
m_name.c_str());
}
}
LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str());
}