std::shared_ptr ExecutionImpl::execute()

in cppcache/src/ExecutionImpl.cpp [97:348]


std::shared_ptr<ResultCollector> ExecutionImpl::execute(
    const std::string& func, std::chrono::milliseconds timeout) {
  LOGDEBUG("ExecutionImpl::execute: ");
  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    LOGDEBUG("ExecutionImpl::execute function on authenticated cache");
    gua.setAuthenticatedView(m_authenticatedView);
  }
  bool serverHasResult = false;
  bool serverIsHA = false;
  bool serverOptimizeForWrite = false;

  auto&& attr = getFunctionAttributes(func);
  {
    if (attr == nullptr) {
      std::lock_guard<decltype(m_func_attrs_lock)> _guard(m_func_attrs_lock);
      GfErrType err = GF_NOERR;
      attr = getFunctionAttributes(func);
      if (attr == nullptr) {
        if (m_region != nullptr) {
          err = dynamic_cast<ThinClientRegion*>(m_region.get())
                    ->getFuncAttributes(func, &attr);
        } else if (m_pool != nullptr) {
          err = getFuncAttributes(func, &attr);
        }
        if (err != GF_NOERR) {
          throwExceptionIfError("Execute::GET_FUNCTION_ATTRIBUTES", err);
        }
        if (!attr->empty() && err == GF_NOERR) {
          m_func_attrs[func] = attr;
        }
      }
    }
  }
  serverHasResult = ((attr->at(0) == 1) ? true : false);
  serverIsHA = ((attr->at(1) == 1) ? true : false);
  serverOptimizeForWrite = ((attr->at(2) == 1) ? true : false);

  LOGDEBUG(
      "ExecutionImpl::execute got functionAttributes from server for function "
      "= %s serverHasResult = %d serverIsHA = %d serverOptimizeForWrite = %d ",
      func.c_str(), serverHasResult, serverIsHA, serverOptimizeForWrite);

  if (serverHasResult == false) {
    m_rc = std::make_shared<NoResult>();
  } else if (m_rc == nullptr) {
    m_rc = std::make_shared<DefaultResultCollector>();
  }

  uint8_t isHAHasResultOptimizeForWrite = 0;
  if (serverIsHA) {
    isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 1;
  }

  if (serverHasResult) {
    isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 2;
  }

  if (serverOptimizeForWrite) {
    isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 4;
  }

  LOGDEBUG("ExecutionImpl::execute: isHAHasResultOptimizeForWrite = %d",
           isHAHasResultOptimizeForWrite);
  TXState* txState = TSSTXStateWrapper::get().getTXState();

  if (txState != nullptr && m_allServer == true) {
    throw UnsupportedOperationException(
        "Execution::execute: Transaction function execution on all servers is "
        "not supported");
  }

  if (m_region != nullptr) {
    int32_t retryAttempts = 3;
    if (m_pool != nullptr) {
      retryAttempts = m_pool->getRetryAttempts();
    }

    if (m_pool != nullptr && m_pool->getPRSingleHopEnabled()) {
      auto tcrdm = std::dynamic_pointer_cast<ThinClientPoolDM>(m_pool);
      if (!tcrdm) {
        throw IllegalArgumentException(
            "Execute: pool cast to ThinClientPoolDM failed");
      }
      auto cms = tcrdm->getClientMetaDataService();
      auto failedNodes = CacheableHashSet::create();
      if ((!m_routingObj || m_routingObj->empty()) &&
          txState == nullptr) {  // For transactions we should not create
                                 // multiple threads
        LOGDEBUG("ExecutionImpl::execute: m_routingObj is empty");
        auto serverToBucketsMap = cms->groupByServerToAllBuckets(
            m_region,
            /*serverOptimizeForWrite*/ (isHAHasResultOptimizeForWrite & 4) ==
                4);
        if (!serverToBucketsMap || serverToBucketsMap->empty()) {
          LOGDEBUG(
              "ExecutionImpl::execute: m_routingObj is empty and locationMap "
              "is also empty so use old FE onRegion");
          std::dynamic_pointer_cast<ThinClientRegion>(m_region)
              ->executeFunction(
                  func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
                  m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
                  timeout);
          dynamic_cast<ThinClientRegion*>(m_region.get())
              ->setMetaDataRefreshed(false);
          cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0);
        } else {
          // convert server to bucket map to server to key map where bucket id
          // is key.
          auto serverToKeysMap =
              std::make_shared<ClientMetadataService::ServerToKeysMap>(
                  serverToBucketsMap->size());
          for (const auto& entry : *serverToBucketsMap) {
            auto keys = std::make_shared<CacheableHashSet>(
                static_cast<int32_t>(entry.second->size()));
            for (const auto& bucket : *(entry.second)) {
              keys->insert(CacheableInt32::create(bucket));
            }
            serverToKeysMap->emplace(entry.first, keys);
          }
          LOGDEBUG(
              "ExecutionImpl::execute: withoutFilter and locationMap is not "
              "empty");
          bool reExecute = std::dynamic_pointer_cast<ThinClientRegion>(m_region)
                               ->executeFunctionSH(
                                   func, m_args, isHAHasResultOptimizeForWrite,
                                   m_rc, serverToKeysMap, failedNodes, timeout,
                                   /*allBuckets*/ true);
          if (reExecute) {  // Fallback to old FE onREgion
            if (isHAHasResultOptimizeForWrite & 1) {  // isHA = true
              m_rc->clearResults();
              auto rs =
                  std::dynamic_pointer_cast<ThinClientRegion>(m_region)
                      ->reExecuteFunction(func, m_args, m_routingObj,
                                          isHAHasResultOptimizeForWrite, m_rc,
                                          (isHAHasResultOptimizeForWrite & 1)
                                              ? retryAttempts
                                              : 0,
                                          failedNodes, timeout);
            }
          }
        }
      } else if (m_routingObj != nullptr && m_routingObj->size() == 1) {
        LOGDEBUG("executeFunction onRegion WithFilter size equal to 1 ");
        dynamic_cast<ThinClientRegion*>(m_region.get())
            ->executeFunction(
                func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc,
                (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
                timeout);
      } else {
        if (txState == nullptr) {
          auto serverToKeysMap = cms->getServerToFilterMapFESHOP(
              m_routingObj, m_region, /*serverOptimizeForWrite*/
              (isHAHasResultOptimizeForWrite & 4) == 4);
          if (!serverToKeysMap || serverToKeysMap->empty()) {
            LOGDEBUG(
                "ExecutionImpl::execute: withFilter but locationMap is empty "
                "so use old FE onRegion");
            dynamic_cast<ThinClientRegion*>(m_region.get())
                ->executeFunction(
                    func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
                    m_rc,
                    (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
                    timeout);
            cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0);
          } else {
            LOGDEBUG(
                "ExecutionImpl::execute: withFilter and locationMap is not "
                "empty");
            bool reExecute =
                dynamic_cast<ThinClientRegion*>(m_region.get())
                    ->executeFunctionSH(func, m_args,
                                        isHAHasResultOptimizeForWrite, m_rc,
                                        serverToKeysMap, failedNodes, timeout,
                                        /*allBuckets*/ false);
            if (reExecute) {  // Fallback to old FE onREgion
              if (isHAHasResultOptimizeForWrite & 1) {  // isHA = true
                m_rc->clearResults();
                auto rs =
                    dynamic_cast<ThinClientRegion*>(m_region.get())
                        ->reExecuteFunction(func, m_args, m_routingObj,
                                            isHAHasResultOptimizeForWrite, m_rc,
                                            (isHAHasResultOptimizeForWrite & 1)
                                                ? retryAttempts
                                                : 0,
                                            failedNodes, timeout);
              }
            }
          }
        } else {  // For transactions use old way
          dynamic_cast<ThinClientRegion*>(m_region.get())
              ->executeFunction(
                  func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
                  m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
                  timeout);
        }
      }
    } else {  // w/o single hop, Fallback to old FE onREgion
      dynamic_cast<ThinClientRegion*>(m_region.get())
          ->executeFunction(
              func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc,
              (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, timeout);
    }
    /*    } catch (TransactionDataNodeHasDepartedException e) {
                    if(txState == nullptr)
                    {
                            GfErrTypeThrowException("Transaction is nullptr",
       GF_CACHE_ILLEGAL_STATE_EXCEPTION);
                    }

                    if(!txState->isReplay())
                            txState->replay(false);
            } catch(TransactionDataRebalancedException e) {
                    if(txState == nullptr)
                    {
                            GfErrTypeThrowException("Transaction is nullptr",
       GF_CACHE_ILLEGAL_STATE_EXCEPTION);
                    }

                    if(!txState->isReplay())
                            txState->replay(true);
            }
    */
    if (serverHasResult == true) {
      // ExecutionImpl::addResults(m_rc, rs);
      m_rc->endResults();
    }

    return m_rc;
  } else if (m_pool != nullptr) {
    if (txState != nullptr) {
      throw UnsupportedOperationException(
          "Execution::execute: Transaction function execution on pool is not "
          "supported");
    }
    if (m_allServer == false) {
      executeOnPool(
          func, isHAHasResultOptimizeForWrite,
          (isHAHasResultOptimizeForWrite & 1) ? m_pool->getRetryAttempts() : 0,
          timeout);
      if (serverHasResult == true) {
        // ExecutionImpl::addResults(m_rc, rs);
        m_rc->endResults();
      }
      return m_rc;
    }
    executeOnAllServers(func, isHAHasResultOptimizeForWrite, timeout);
  } else {
    throw IllegalStateException("Execution::execute: should not be here");
  }
  return m_rc;
}