bool ThinClientRegion::executeFunctionSH()

in cppcache/src/ThinClientRegion.cpp [3025:3137]


bool ThinClientRegion::executeFunctionSH(
    const std::string& func, const std::shared_ptr<Cacheable>& args,
    uint8_t getResult, std::shared_ptr<ResultCollector> rc,
    const std::shared_ptr<ClientMetadataService::ServerToKeysMap>& locationMap,
    std::shared_ptr<CacheableHashSet>& failedNodes,
    std::chrono::milliseconds timeout, bool allBuckets) {
  bool reExecute = false;
  auto resultCollectorLock = std::make_shared<std::recursive_mutex>();
  const auto& userAttr = UserAttributes::threadLocalUserAttributes;
  std::vector<std::shared_ptr<OnRegionFunctionExecution>> feWorkers;
  auto& threadPool =
      CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool();

  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    const auto& routingObj = locationIter.second;
    auto worker = std::make_shared<OnRegionFunctionExecution>(
        func, this, args, routingObj, getResult, timeout,
        dynamic_cast<ThinClientPoolDM*>(m_tcrdm.get()), resultCollectorLock, rc,
        userAttr, false, serverLocation, allBuckets);
    threadPool.perform(worker);
    feWorkers.push_back(worker);
  }

  GfErrType abortError = GF_NOERR;

  for (auto worker : feWorkers) {
    auto err = worker->getResult();
    auto currentReply = worker->getReply();

    if (err == GF_NOERR &&
        (currentReply->getMessageType() == TcrMessage::EXCEPTION ||
         currentReply->getMessageType() ==
             TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
      err = ThinClientRegion::handleServerException(
          "Execute", currentReply->getException());
    }

    if (err != GF_NOERR) {
      if (err == GF_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION) {
        if (auto poolDM =
                std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
          if (poolDM->getClientMetaDataService()) {
            poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
                this->getFullPath(), 0);
          }
        }

        if (!(getResult & 1) && abortError == GF_NOERR) {  // isHA = false
          abortError = err;
        } else if (getResult & 1) {  // isHA = true
          reExecute = true;
          worker->getResultCollector()->reset();
          {
            std::lock_guard<decltype(*resultCollectorLock)> guard(
                *resultCollectorLock);
            rc->clearResults();
          }
          std::shared_ptr<CacheableHashSet> failedNodeIds(
              currentReply->getFailedNode());
          if (failedNodeIds) {
            LOGDEBUG(
                "ThinClientRegion::executeFunctionSH with "
                "GF_INTERNAL_FUNCTION_INVOCATION_TARGET_EXCEPTION "
                "failedNodeIds size = %zu ",
                failedNodeIds->size());
            failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end());
          }
        }
      } else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) ||
                 (err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) {
        LOGINFO(
            "ThinClientRegion::executeFunctionSH with GF_NOTCON or "
            "GF_CLIENT_WAIT_TIMEOUT ");
        if (auto poolDM =
                std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
          if (poolDM->getClientMetaDataService()) {
            poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
                this->getFullPath(), 0);
          }
        }

        if (!(getResult & 1) && abortError == GF_NOERR) {  // isHA = false
          abortError = err;
        } else if (getResult & 1) {  // isHA = true
          reExecute = true;
          worker->getResultCollector()->reset();
          {
            std::lock_guard<decltype(*resultCollectorLock)> guard(
                *resultCollectorLock);
            rc->clearResults();
          }
        }
      } else {
        if (ThinClientBaseDM::isFatalClientError(err)) {
          LOGERROR("ThinClientRegion::executeFunctionSH: Fatal Exception");
        } else {
          LOGWARN("ThinClientRegion::executeFunctionSH: Unexpected Exception");
        }

        if (abortError == GF_NOERR) {
          abortError = err;
        }
      }
    }
  }

  if (abortError != GF_NOERR) {
    throwExceptionIfError("ExecuteOnRegion:", abortError);
  }

  return reExecute;
}