GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote()

in cppcache/src/ThinClientRegion.cpp [1564:1808]


GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
    ThinClientPoolDM* tcrdm,
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG(
      " ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %zu",
      keys.size());
  auto region = shared_from_this();
  GfErrType error = GF_NOERR;

  auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
      keys, region, true);
  if (!locationMap) {
    // removeAll with multiple hop implementation
    LOGDEBUG("locationMap is Null or Empty");
    return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
                                           aCallbackArgument);
  }

  // set this flag that indicates putAll on PR is invoked with singlehop
  // enabled.
  m_isPRSingleHopEnabled = true;
  LOGDEBUG("locationMap.size() = %zu ", locationMap->size());

  /*Step-2
   *  a. create vector of RemoveAllWork
   *  b. locationMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
   * server specific filteredMap/subMap by populating all keys
   * (locationIter.second()) and its corr. values from the user Map.
   *  c. create new instance of RemoveAllWork, i.e worker with required params.
   *     //TODO:: Add details of each parameter later
   *  d. enqueue the worker for thread from threadPool to perform/run execute
   * method.
   *  e. insert the worker into the vector.
   */
  std::vector<std::shared_ptr<RemoveAllWork>> removeAllWorkers;
  auto& threadPool = m_cacheImpl->getThreadPool();
  int locationMapIndex = 0;
  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    if (serverLocation == nullptr) {
      LOGDEBUG("serverLocation is nullptr");
    }
    const auto& mappedkeys = locationIter.second;
    auto worker = std::make_shared<RemoveAllWork>(
        tcrdm, serverLocation, region, true /*attemptFailover*/,
        false /*isBGThread*/, mappedkeys, aCallbackArgument);
    threadPool.perform(worker);
    removeAllWorkers.push_back(worker);
    locationMapIndex++;
  }
  // TODO::CHECK, do we need to set following ..??
  // reply.setMessageType(TcrMessage::RESPONSE);

  int cnt = 1;

  /**
   * Step::3
   * a. Iterate over all vector of putAllWorkers and populate worker specific
   * information into the HashMap
   *    resultMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<Serializable>>, 2nd part, Value can be a
   * std::shared_ptr<VersionedCacheableObjectPartList> or
   * std::shared_ptr<PutAllPartialResultServerException>.
   *    failedServers<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
   * the worker
   */
  auto resultMap = ResultMap();
  auto failedServers = FailedServersMap();
  for (const auto& worker : removeAllWorkers) {
    auto err =
        worker->getResult();  // wait() or blocking call for worker thread.
    LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);

    if (GF_NOERR == err) {
      // No Exception from server
      resultMap.emplace(worker->getServerLocation(),
                        worker->getResultCollector()->getList());
    } else {
      error = err;

      if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
        resultMap.emplace(worker->getServerLocation(),
                          worker->getPaPResultException());
      } else if (error == GF_NOTCON) {
        // Refresh the metadata in case of GF_NOTCON.
        tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
            region->getFullPath(), 0);
      }
      failedServers.emplace(worker->getServerLocation(),
                            CacheableInt32::create(error));
    }

    LOGDEBUG(
        "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
        worker->getResultCollector()->getList()->getVersionedTagsize());

    cnt++;
  }
  /**
   * Step:4
   * a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
   * map.size() b. Iterate over the resultMap and value for the particular
   * serverlocation is of type VersionedCacheableObjectPartList add keys and
   * versions. C. ToDO:: what if the value in the resultMap is of type
   * PutAllPartialResultServerException
   */
  std::recursive_mutex responseLock;
  auto result = std::make_shared<PutAllPartialResult>(
      static_cast<int>(keys.size()), responseLock);
  LOGDEBUG(
      " TCRegion:: %s:%d  "
      "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
      __FILE__, __LINE__,
      result->getSucceededKeysAndVersions()->getVersionedTagsize());
  LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
           resultMap.size());
  for (const auto& resultMapIter : resultMap) {
    const auto& value = resultMapIter.second;

    if (const auto papException =
            std::dynamic_pointer_cast<PutAllPartialResultServerException>(
                value)) {
      // PutAllPartialResultServerException CASE:: value in resultMap is of type
      // PutAllPartialResultServerException.
      // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
      // keys in map failedServers,
      //       that is set view of the keys contained in failedservers map.
      // TODO:: need to read  papException and populate PutAllPartialResult.
      result->consolidate(papException->getResult());
    } else if (const auto list =
                   std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
                       value)) {
      // value in resultMap is of type VCOPL.
      result->addKeysAndVersions(list);
    } else {
      // ERROR CASE
      if (value) {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
            "could not Cast to either VCOPL or "
            "PutAllPartialResultServerException:%s",
            value->toString().c_str());
      } else {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
            "is nullptr");
      }
    }
  }

  /**
   * a. if PutAllPartialResult result does not contains any entry,  Iterate over
   * locationMap.
   * b. Create std::vector<std::shared_ptr<CacheableKey>>  succeedKeySet, and
   * keep adding set of keys (locationIter.second()) in locationMap for which
   * failedServers->contains(locationIter.first()is false.
   */

  LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
           __LINE__, failedServers.size());

  // if the partial result set doesn't already have keys (for tracking version
  // tags)
  // then we need to gather up the keys that we know have succeeded so far and
  // add them to the partial result set (See bug Id #955)
  if (!failedServers.empty()) {
    auto succeedKeySet =
        std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
    if (result->getSucceededKeysAndVersions()->size() == 0) {
      for (const auto& locationIter : *locationMap) {
        if (failedServers.find(locationIter.first) != failedServers.end()) {
          for (const auto& i : *(locationIter.second)) {
            succeedKeySet->push_back(i);
          }
        }
      }
      result->addKeys(succeedKeySet);
    }
  }

  /**
   * a. Iterate over the failedServers map
   * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
   * continue, Do not retry putAll for corr. keys.
   * b. Retry for all the failed server.
   *    Generate a newSubMap by finding Keys specific to failedServers from
   * locationMap and finding their respective values from the usermap.
   */
  error = GF_NOERR;
  bool oneSubMapRetryFailed = false;
  for (const auto& failedServerIter : failedServers) {
    if (failedServerIter.second->value() ==
        GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {  // serverLocation
      // will not retry for PutAllPartialResultException
      // but it means at least one sub map ever failed
      oneSubMapRetryFailed = true;
      error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
      continue;
    }

    std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
        nullptr;
    const auto& failedSerInLocMapIter =
        locationMap->find(failedServerIter.first);
    if (failedSerInLocMapIter != locationMap->end()) {
      failedKeys = failedSerInLocMapIter->second;
    }

    if (failedKeys == nullptr) {
      LOGERROR(
          "TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are "
          "nullptr "
          "that is not valid");
    }

    std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
    std::shared_ptr<PutAllPartialResultServerException> papResultServerExc =
        nullptr;
    GfErrType errCode = multiHopRemoveAllNoThrow_remote(
        *failedKeys, vcopListPtr, aCallbackArgument);
    if (errCode == GF_NOERR) {
      result->addKeysAndVersions(vcopListPtr);
    } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
      oneSubMapRetryFailed = true;
      error = errCode;
    } else /*if(errCode != GF_NOERR)*/ {
      oneSubMapRetryFailed = true;
      std::shared_ptr<Exception> excptPtr = nullptr;
      result->saveFailedKey(failedKeys->at(0), excptPtr);
      error = errCode;
    }
  }

  if (!oneSubMapRetryFailed) {
    error = GF_NOERR;
  }
  versionedObjPartList = result->getSucceededKeysAndVersions();
  LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
           versionedObjPartList->size(), error);
  return error;
}