GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote()

in cppcache/src/ThinClientRegion.cpp [1174:1474]


GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
    ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    std::chrono::milliseconds timeout,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %zu",
           map.size());
  auto region = shared_from_this();

  auto error = GF_NOERR;
  /*Step-1::
   * populate the keys vector from the user Map and pass it to the
   * getServerToFilterMap to generate locationMap
   * If locationMap is nullptr try the old, existing putAll impl that may take
   * multiple n/w hops
   */
  auto userKeys = std::vector<std::shared_ptr<CacheableKey>>();
  for (const auto& iter : map) {
    userKeys.push_back(iter.first);
  }
  // last param in getServerToFilterMap() is false for putAll

  // LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d
  // ", userKeys->size());
  auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
      userKeys, region, true);
  if (!locationMap) {
    // putAll with multiple hop implementation
    LOGDEBUG("locationMap is Null or Empty");

    return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
                                        aCallbackArgument);
  }

  // set this flag that indicates putAll on PR is invoked with singlehop
  // enabled.
  m_isPRSingleHopEnabled = true;
  // LOGDEBUG("locationMap.size() = %d ", locationMap->size());

  /*Step-2
   *  a. create vector of PutAllWork
   *  b. locationMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
   * server specific filteredMap/subMap by populating all keys
   * (locationIter.second()) and its corr. values from the user Map.
   *  c. create new instance of PutAllWork, i.e worker with required params.
   *     //TODO:: Add details of each parameter later
   *  d. enqueue the worker for thread from threadPool to perform/run execute
   * method.
   *  e. insert the worker into the vector.
   */
  std::vector<std::shared_ptr<PutAllWork>> putAllWorkers;
  auto& threadPool = m_cacheImpl->getThreadPool();
  int locationMapIndex = 0;
  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    if (serverLocation == nullptr) {
      LOGDEBUG("serverLocation is nullptr");
    }
    const auto& keys = locationIter.second;

    // Create server specific Sub-Map by iterating over keys.
    auto filteredMap = std::make_shared<HashMapOfCacheable>();
    if (keys != nullptr && keys->size() > 0) {
      for (const auto& key : *keys) {
        const auto& iter = map.find(key);
        if (iter != map.end()) {
          filteredMap->emplace(iter->first, iter->second);
        }
      }
    }

    auto worker = std::make_shared<PutAllWork>(
        tcrdm, serverLocation, region, true /*attemptFailover*/,
        false /*isBGThread*/, filteredMap, keys, timeout, aCallbackArgument);
    threadPool.perform(worker);
    putAllWorkers.push_back(worker);
    locationMapIndex++;
  }

  // TODO::CHECK, do we need to set following ..??
  // reply.setMessageType(TcrMessage::RESPONSE);

  int cnt = 1;

  /**
   * Step::3
   * a. Iterate over all vector of putAllWorkers and populate worker specific
   * information into the HashMap
   *    resultMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<Serializable>>, 2nd part, Value can be a
   * std::shared_ptr<VersionedCacheableObjectPartList> or
   * std::shared_ptr<PutAllPartialResultServerException>.
   *    failedServers<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
   * the worker
   */
  auto resultMap = ResultMap();
  auto failedServers = FailedServersMap();

  for (const auto& worker : putAllWorkers) {
    auto err =
        worker->getResult();  // wait() or blocking call for worker thread.
    LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);

    if (GF_NOERR == err) {
      // No Exception from server
      resultMap.emplace(worker->getServerLocation(),
                        worker->getResultCollector()->getList());
    } else {
      error = err;

      if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
        resultMap.emplace(worker->getServerLocation(),
                          worker->getPaPResultException());
      } else if (error == GF_NOTCON) {
        // Refresh the metadata in case of GF_NOTCON.
        tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
            region->getFullPath(), 0);
      }
      failedServers.emplace(worker->getServerLocation(),
                            CacheableInt32::create(error));
    }

    LOGDEBUG("worker->getPutAllMap()->size() = %zu ",
             worker->getPutAllMap()->size());
    LOGDEBUG(
        "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
        worker->getResultCollector()->getList()->getVersionedTagsize());

    // TODO::CHECK, why do we need following code... ??
    // TcrMessage* currentReply = worker->getReply();
    /*
    if(currentReply->getMessageType() != TcrMessage::REPLY)
    {
      reply.setMessageType(currentReply->getMessageType());
    }
    */

    cnt++;
  }
  /**
   * Step:4
   * a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
   * map.size() b. Iterate over the resultMap and value for the particular
   * serverlocation is of type VersionedCacheableObjectPartList add keys and
   * versions. C. ToDO:: what if the value in the resultMap is of type
   * PutAllPartialResultServerException
   */
  std::recursive_mutex responseLock;
  auto result = std::make_shared<PutAllPartialResult>(
      static_cast<int>(map.size()), responseLock);
  LOGDEBUG(
      " TCRegion:: %s:%d  "
      "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
      __FILE__, __LINE__,
      result->getSucceededKeysAndVersions()->getVersionedTagsize());
  LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
           resultMap.size());
  for (const auto& resultMapIter : resultMap) {
    const auto& value = resultMapIter.second;

    if (const auto papException =
            std::dynamic_pointer_cast<PutAllPartialResultServerException>(
                value)) {
      // PutAllPartialResultServerException CASE:: value in resultMap is of type
      // PutAllPartialResultServerException.
      // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
      // keys in map failedServers,
      //       that is set view of the keys contained in failedservers map.
      // TODO:: need to read  papException and populate PutAllPartialResult.
      result->consolidate(papException->getResult());
    } else if (const auto list =
                   std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
                       value)) {
      // value in resultMap is of type VCOPL.
      result->addKeysAndVersions(list);
    } else {
      // ERROR CASE
      if (value) {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value "
            "could not Cast to either VCOPL or "
            "PutAllPartialResultServerException:%s",
            value->toString().c_str());
      } else {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is "
            "nullptr");
      }
    }
  }

  /**
   * a. if PutAllPartialResult result does not contains any entry,  Iterate over
   * locationMap.
   * b. Create std::vector<std::shared_ptr<CacheableKey>>  succeedKeySet, and
   * keep adding set of keys (locationIter.second()) in locationMap for which
   * failedServers->contains(locationIter.first()is false.
   */

  LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
           __LINE__, failedServers.size());

  // if the partial result set doesn't already have keys (for tracking version
  // tags)
  // then we need to gather up the keys that we know have succeeded so far and
  // add them to the partial result set (See bug Id #955)
  if (!failedServers.empty()) {
    auto succeedKeySet =
        std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
    if (result->getSucceededKeysAndVersions()->size() == 0) {
      for (const auto& locationIter : *locationMap) {
        if (failedServers.find(locationIter.first) != failedServers.end()) {
          for (const auto& i : *(locationIter.second)) {
            succeedKeySet->push_back(i);
          }
        }
      }
      result->addKeys(succeedKeySet);
    }
  }

  /**
   * a. Iterate over the failedServers map
   * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
   * continue, Do not retry putAll for corr. keys.
   * b. Retry for all the failed server.
   *    Generate a newSubMap by finding Keys specific to failedServers from
   * locationMap and finding their respective values from the usermap.
   */
  error = GF_NOERR;
  bool oneSubMapRetryFailed = false;
  for (const auto& failedServerIter : failedServers) {
    if (failedServerIter.second->value() ==
        GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {  // serverLocation
      // will not retry for PutAllPartialResultException
      // but it means at least one sub map ever failed
      oneSubMapRetryFailed = true;
      error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
      continue;
    }

    std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
        nullptr;
    const auto& failedSerInLocMapIter =
        locationMap->find(failedServerIter.first);
    if (failedSerInLocMapIter != locationMap->end()) {
      failedKeys = failedSerInLocMapIter->second;
    }

    if (failedKeys == nullptr) {
      LOGERROR(
          "TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are nullptr "
          "that is not valid");
    }

    auto newSubMap = std::make_shared<HashMapOfCacheable>();
    if (failedKeys && !failedKeys->empty()) {
      for (const auto& key : *failedKeys) {
        const auto& iter = map.find(key);
        if (iter != map.end()) {
          newSubMap->emplace(iter->first, iter->second);
        } else {
          LOGERROR(
              "DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not "
              "found in user failedSubMap");
        }
      }
    }

    std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
    GfErrType errCode = multiHopPutAllNoThrow_remote(
        *newSubMap, vcopListPtr, timeout, aCallbackArgument);
    if (errCode == GF_NOERR) {
      result->addKeysAndVersions(vcopListPtr);
    } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
      oneSubMapRetryFailed = true;
      // TODO:: Commented it as papResultServerExc is nullptr this time
      //       UnComment it once you read papResultServerExc.
      // result->consolidate(papResultServerExc->getResult());
      error = errCode;
    } else /*if(errCode != GF_NOERR)*/ {
      oneSubMapRetryFailed = true;
      const auto& firstKey = newSubMap->begin()->first;
      std::shared_ptr<Exception> excptPtr = nullptr;
      // TODO:: formulat excptPtr from the errCode
      result->saveFailedKey(firstKey, excptPtr);
      error = errCode;
    }
  }

  if (!oneSubMapRetryFailed) {
    error = GF_NOERR;
  }
  versionedObjPartList = result->getSucceededKeysAndVersions();
  LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
           versionedObjPartList->size(), error);

  return error;
}