in cppcache/src/ThinClientDistributionManager.cpp [116:212]
GfErrType ThinClientDistributionManager::sendSyncRequest(TcrMessage& request,
TcrMessageReply& reply,
bool attemptFailover,
bool) {
GfErrType error = GF_NOTCON;
bool useActiveEndpoint = true;
request.setDM(this);
reply.setDM(this);
if (request.getMessageType() == TcrMessage::GET_ALL_70 ||
request.getMessageType() == TcrMessage::GET_ALL_WITH_CALLBACK) {
request.InitializeGetallMsg(
request.getCallbackArgument()); // now initialize getall msg
}
int currentEndpoint = m_activeEndpoint;
if (currentEndpoint >= 0 && m_endpoints[currentEndpoint]->connected()) {
LOGDEBUG(
"ThinClientDistributionManager::sendSyncRequest: trying to send on "
"endpoint: %s",
m_endpoints[currentEndpoint]->name().c_str());
error = sendRequestToEP(request, reply, m_endpoints[currentEndpoint]);
useActiveEndpoint = false;
LOGDEBUG(
"ThinClientDistributionManager::sendSyncRequest: completed send on "
"endpoint: %s [error:%d]",
m_endpoints[currentEndpoint]->name().c_str(), error);
}
if (!attemptFailover || error == GF_NOERR) {
return error;
}
bool doRand = true;
std::vector<int> randIndex;
int32_t type = request.getMessageType();
bool forceSelect = false;
// we need to forceSelect because endpoint connection status
// is not set to false (in tcrendpoint::send) for a query or putall timeout
if ((type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
error == GF_TIMEOUT) {
forceSelect = true;
}
if (!isFatalError(error)) {
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
GfErrType connErr = GF_NOERR;
while (error != GF_NOERR && !isFatalError(error) &&
(connErr = selectEndpoint(randIndex, doRand, useActiveEndpoint,
forceSelect)) == GF_NOERR) {
// if it's a query or putall and we had a timeout, just return with the
// newly
// selected endpoint without failover-retry
if ((type == TcrMessage::QUERY ||
type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
error == GF_TIMEOUT) {
return error;
}
currentEndpoint = m_activeEndpoint;
LOGFINEST(
"ThinClientDistributionManager::sendSyncRequest: trying send on new "
"endpoint %s",
m_endpoints[currentEndpoint]->name().c_str());
error = sendRequestToEP(request, reply, m_endpoints[currentEndpoint]);
if (error != GF_NOERR) {
LOGFINE(
"ThinClientDistributionManager::sendSyncRequest: failed send on "
"new endpoint %s for message "
"type %d [error:%d]",
m_endpoints[currentEndpoint]->name().c_str(),
request.getMessageType(), error);
} else {
LOGFINEST(
"ThinClientDistributionManager::sendSyncRequest: completed send on "
"new endpoint: %s",
m_endpoints[currentEndpoint]->name().c_str());
}
useActiveEndpoint = false;
}
// : Top-level only sees NotConnectedException or TimeoutException
if ((error == GF_NOERR && connErr != GF_NOERR) || error == GF_IOERR) {
error = GF_NOTCON;
}
}
return error;
}