in cppcache/src/ThinClientPoolDM.cpp [1265:1553]
GfErrType ThinClientPoolDM::sendSyncRequest(
TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
bool isBGThread,
const std::shared_ptr<BucketServerLocation>& serverLocation) {
LOGDEBUG("ThinClientPoolDM::sendSyncRequest: ....%d %s",
request.getMessageType(), m_poolName.c_str());
// Increment clientOps
getStats().setCurClientOps(++m_clientOps);
GfErrType error = GF_NOTCON;
std::shared_ptr<UserAttributes> userAttr = nullptr;
reply.setDM(this);
int32_t type = request.getMessageType();
if (!(type == TcrMessage::QUERY ||
type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) {
// set only when message is not query, putall and executeCQ
reply.setTimeout(getReadTimeout());
request.setTimeout(getReadTimeout());
}
bool retryAllEPsOnce = false;
if (m_attrs->getRetryAttempts() == -1) {
retryAllEPsOnce = true;
}
auto retry = m_attrs->getRetryAttempts() + 1;
TcrConnection* conn = nullptr;
std::set<ServerLocation> excludeServers;
type = request.getMessageType();
bool isAuthRequireExcep = false;
int isAuthRequireExcepMaxTry = 2;
bool firstTry = true;
LOGFINE("sendSyncRequest:: retry = %d", retry);
while (retryAllEPsOnce || retry-- ||
(isAuthRequireExcep && isAuthRequireExcepMaxTry >= 0)) {
isAuthRequireExcep = false;
if (!firstTry) request.updateHeaderForRetry();
// if it's a query or putall and we had a timeout, just return with the
// newly selected endpoint without failover-retry
if ((type == TcrMessage::QUERY ||
type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
error == GF_TIMEOUT) {
return error;
}
GfErrType queueErr = GF_NOERR;
auto lastExcludeSize = static_cast<uint32_t>(excludeServers.size());
int8_t version = 0;
bool isUserNeedToReAuthenticate = false;
bool singleHopConnFound = false;
bool connFound = false;
if (!m_isMultiUserMode || (!TcrMessage::isUserInitiativeOps(request))) {
conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread,
request, version, singleHopConnFound,
connFound, serverLocation);
} else {
userAttr = UserAttributes::threadLocalUserAttributes;
if (userAttr == nullptr) {
LOGWARN("Attempted operation type %d without credentials",
request.getMessageType());
return GF_NOT_AUTHORIZED_EXCEPTION;
}
// Can i assume here that we will always get connection here
conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread,
request, version, singleHopConnFound,
connFound, serverLocation);
if (conn != nullptr) { // need to chk whether user is already
// authenticated to this endpoint or not.
isUserNeedToReAuthenticate =
!(userAttr->isEndpointAuthenticated(conn->getEndpointObject()));
}
}
if (queueErr == GF_CLIENT_WAIT_TIMEOUT) {
LOGFINE("Request timeout at client only");
return GF_CLIENT_WAIT_TIMEOUT;
} else if (queueErr == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
// need to refresh meta data
auto region =
m_connManager.getCacheImpl()->getRegion(request.getRegionName());
if (region != nullptr) {
LOGFINE(
"Need to refresh pr-meta-data timeout in client only with "
"refresh "
"metadata");
auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
tcrRegion->setMetaDataRefreshed(false);
m_clientMetadataService->enqueueForMetadataRefresh(
region->getFullPath(), reply.getserverGroupVersion());
}
return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
}
LOGDEBUG(
"ThinClientPoolDM::sendSyncRequest: isUserNeedToReAuthenticate = %d ",
isUserNeedToReAuthenticate);
LOGDEBUG(
"ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = "
"%p "
"type = %d",
m_isMultiUserMode, conn, type);
if (!conn) {
// lets assume all connection are in use will happen
if (queueErr == GF_NOERR) {
queueErr = GF_ALL_CONNECTIONS_IN_USE_EXCEPTION;
getStats().setCurClientOps(--m_clientOps);
getStats().incFailedClientOps();
return queueErr;
} else if (queueErr == GF_IOERR) {
error = GF_NOTCON;
} else {
error = queueErr;
}
}
if (conn) {
TcrEndpoint* ep = conn->getEndpointObject();
LOGDEBUG(
"ThinClientPoolDM::sendSyncRequest: sendSyncReq "
"ep->isAuthenticated() = %d ",
ep->isAuthenticated());
GfErrType userCredMsgErr = GF_NOERR;
bool isServerException = false;
if (TcrMessage::isUserInitiativeOps(request) &&
(m_isSecurityOn || m_isMultiUserMode)) {
if (!m_isMultiUserMode && !ep->isAuthenticated()) {
// first authenticate him on this endpoint
userCredMsgErr = sendUserCredentials(getCredentials(ep), conn,
isBGThread, isServerException);
} else if (isUserNeedToReAuthenticate) {
userCredMsgErr = sendUserCredentials(userAttr->getCredentials(), conn,
isBGThread, isServerException);
}
}
if (userCredMsgErr == GF_NOERR) {
error = ep->sendRequestConnWithRetry(request, reply, conn);
error = handleEPError(ep, reply, error);
} else {
error = userCredMsgErr;
}
if (!isServerException) {
if (error == GF_NOERR) {
LOGDEBUG("putting connection back in queue");
putInQueue(conn,
isBGThread ||
request.getMessageType() == TcrMessage::GET_ALL_70 ||
request.getMessageType() ==
TcrMessage::GET_ALL_WITH_CALLBACK ||
request.getMessageType() ==
TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP,
request.forTransaction()); // connFound is only relevant
// for Sticky conn.
LOGDEBUG("putting connection back in queue DONE");
} else {
if (error != GF_TIMEOUT) removeEPConnections(ep);
// Update stats for the connection that failed.
removeEPConnections(1, false);
setStickyNull(isBGThread ||
request.getMessageType() == TcrMessage::GET_ALL_70 ||
request.getMessageType() ==
TcrMessage::GET_ALL_WITH_CALLBACK ||
request.getMessageType() ==
TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP);
if (conn) {
try {
GF_SAFE_DELETE_CON(conn);
} catch (...) {
}
}
excludeServers.insert(ServerLocation(ep->name()));
removeEPFromMetadataIfError(error, ep);
}
} else {
return error; // server exception while sending credential message to
}
// server...
}
if (error == GF_NOERR) {
if ((m_isSecurityOn || m_isMultiUserMode)) {
if (reply.getMessageType() == TcrMessage::EXCEPTION) {
if (isAuthRequireException(reply.getException())) {
TcrEndpoint* ep = conn->getEndpointObject();
if (!m_isMultiUserMode) {
ep->setAuthenticated(false);
} else if (userAttr != nullptr) {
userAttr->unAuthenticateEP(ep);
}
LOGFINEST(
"After getting AuthenticationRequiredException trying "
"again.");
isAuthRequireExcepMaxTry--;
isAuthRequireExcep = true;
continue;
} else if (isNotAuthorizedException(reply.getException())) {
LOGDEBUG("received NotAuthorizedException");
// TODO should we try again?
}
}
}
LOGFINER(
"reply Metadata version is %d & bsl version is %d "
"reply.isFEAnotherHop()=%d",
reply.getMetaDataVersion(), version, reply.isFEAnotherHop());
if (m_clientMetadataService && request.forSingleHop() &&
(reply.getMetaDataVersion() != 0 ||
(request.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION &&
request.getKeyRef() != nullptr && reply.isFEAnotherHop()))) {
// Need to get direct access to Region's name to avoid referencing
// temp data and causing crashes
auto region =
m_connManager.getCacheImpl()->getRegion(request.getRegionName());
if (region != nullptr) {
if (!connFound) // max limit case then don't refresh otherwise
// always refresh
{
LOGFINE("Need to refresh pr-meta-data");
auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
tcrRegion->setMetaDataRefreshed(false);
}
m_clientMetadataService->enqueueForMetadataRefresh(
region->getFullPath(), reply.getserverGroupVersion());
}
}
}
if (excludeServers.size() == lastExcludeSize) {
excludeServers.clear();
if (retryAllEPsOnce) {
break;
}
}
if (!attemptFailover || error == GF_NOERR) {
getStats().setCurClientOps(--m_clientOps);
if (error == GF_NOERR) {
getStats().incSucceedClientOps(); /*inc Id for clientOs stat*/
} else if (error == GF_TIMEOUT) {
getStats().incTimeoutClientOps();
} else {
getStats().incFailedClientOps();
}
// Top-level only sees NotConnectedException
if (error == GF_IOERR) {
error = GF_NOTCON;
}
return error;
}
conn = nullptr;
firstTry = false;
} // While
getStats().setCurClientOps(--m_clientOps);
if (error == GF_NOERR) {
getStats().incSucceedClientOps();
} else if (error == GF_TIMEOUT) {
getStats().incTimeoutClientOps();
} else {
getStats().incFailedClientOps();
}
// Top-level only sees NotConnectedException
if (error == GF_IOERR) {
error = GF_NOTCON;
}
return error;
}