in cppcache/src/ThinClientRegion.cpp [3025:3137]
bool ThinClientRegion::executeFunctionSH(
const std::string& func, const std::shared_ptr<Cacheable>& args,
uint8_t getResult, std::shared_ptr<ResultCollector> rc,
const std::shared_ptr<ClientMetadataService::ServerToKeysMap>& locationMap,
std::shared_ptr<CacheableHashSet>& failedNodes,
std::chrono::milliseconds timeout, bool allBuckets) {
bool reExecute = false;
auto resultCollectorLock = std::make_shared<std::recursive_mutex>();
const auto& userAttr = UserAttributes::threadLocalUserAttributes;
std::vector<std::shared_ptr<OnRegionFunctionExecution>> feWorkers;
auto& threadPool =
CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool();
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
const auto& routingObj = locationIter.second;
auto worker = std::make_shared<OnRegionFunctionExecution>(
func, this, args, routingObj, getResult, timeout,
dynamic_cast<ThinClientPoolDM*>(m_tcrdm.get()), resultCollectorLock, rc,
userAttr, false, serverLocation, allBuckets);
threadPool.perform(worker);
feWorkers.push_back(worker);
}
GfErrType abortError = GF_NOERR;
for (auto worker : feWorkers) {
auto err = worker->getResult();
auto currentReply = worker->getReply();
if (err == GF_NOERR &&
(currentReply->getMessageType() == TcrMessage::EXCEPTION ||
currentReply->getMessageType() ==
TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
err = ThinClientRegion::handleServerException(
"Execute", currentReply->getException());
}
if (err != GF_NOERR) {
if (err == GF_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION) {
if (auto poolDM =
std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
if (poolDM->getClientMetaDataService()) {
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
this->getFullPath(), 0);
}
}
if (!(getResult & 1) && abortError == GF_NOERR) { // isHA = false
abortError = err;
} else if (getResult & 1) { // isHA = true
reExecute = true;
worker->getResultCollector()->reset();
{
std::lock_guard<decltype(*resultCollectorLock)> guard(
*resultCollectorLock);
rc->clearResults();
}
std::shared_ptr<CacheableHashSet> failedNodeIds(
currentReply->getFailedNode());
if (failedNodeIds) {
LOGDEBUG(
"ThinClientRegion::executeFunctionSH with "
"GF_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION "
"failedNodeIds size = %zu ",
failedNodeIds->size());
failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end());
}
}
} else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) ||
(err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) {
LOGINFO(
"ThinClientRegion::executeFunctionSH with GF_NOTCON or "
"GF_CLIENT_WAIT_TIMEOUT ");
if (auto poolDM =
std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
if (poolDM->getClientMetaDataService()) {
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
this->getFullPath(), 0);
}
}
if (!(getResult & 1) && abortError == GF_NOERR) { // isHA = false
abortError = err;
} else if (getResult & 1) { // isHA = true
reExecute = true;
worker->getResultCollector()->reset();
{
std::lock_guard<decltype(*resultCollectorLock)> guard(
*resultCollectorLock);
rc->clearResults();
}
}
} else {
if (ThinClientBaseDM::isFatalClientError(err)) {
LOGERROR("ThinClientRegion::executeFunctionSH: Fatal Exception");
} else {
LOGWARN("ThinClientRegion::executeFunctionSH: Unexpected Exception");
}
if (abortError == GF_NOERR) {
abortError = err;
}
}
}
}
if (abortError != GF_NOERR) {
throwExceptionIfError("ExecuteOnRegion:", abortError);
}
return reExecute;
}