GfErrType LocalRegion::putAllNoThrow()

in cppcache/src/LocalRegion.cpp [1931:2104]


GfErrType LocalRegion::putAllNoThrow(
    const HashMapOfCacheable& map, std::chrono::milliseconds timeout,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  CHECK_DESTROY_PENDING_NOTHROW(shared_lock);
  GfErrType err = GF_NOERR;
  // std::shared_ptr<VersionTag> versionTag;
  std::shared_ptr<VersionedCacheableObjectPartList>
      versionedObjPartListPtr;  //= new VersionedCacheableObjectPartList();
  TXState* txState = getTXState();
  if (txState != nullptr) {
    if (isLocalOp()) {
      return GF_NOTSUP;
    }

    err = putAllNoThrow_remote(map, /*versionTag*/ versionedObjPartListPtr,
                               timeout, aCallbackArgument);
    if (err == GF_NOERR) {
      txState->setDirty();
    }

    return err;
  }

  bool cachingEnabled = m_regionAttributes.getCachingEnabled();
  MapOfOldValue oldValueMap;

  // remove tracking for the entries befor exiting the function
  struct RemoveTracking {
   private:
    const MapOfOldValue& m_oldValueMap;
    LocalRegion& m_region;

   public:
    RemoveTracking(const MapOfOldValue& oldValueMap, LocalRegion& region)
        : m_oldValueMap(oldValueMap), m_region(region) {}
    ~RemoveTracking() {
      if (!m_region.getAttributes().getConcurrencyChecksEnabled()) {
        // need to remove the tracking added to the entries at the end
        for (MapOfOldValue::const_iterator iter = m_oldValueMap.begin();
             iter != m_oldValueMap.end(); ++iter) {
          if (iter->second.second >= 0) {
            m_region.m_entries->removeTrackerForEntry(iter->first);
          }
        }
      }
    }
  } _removeTracking(oldValueMap, *this);

  if (cachingEnabled || m_writer != nullptr) {
    std::shared_ptr<Cacheable> oldValue;
    for (const auto& iter : map) {
      const auto& key = iter.first;
      if (cachingEnabled && !m_regionAttributes.getConcurrencyChecksEnabled()) {
        int updateCount =
            m_entries->addTrackerForEntry(key, oldValue, true, false, true);
        oldValueMap.insert(
            std::make_pair(key, std::make_pair(oldValue, updateCount)));
      }
      if (m_writer != nullptr) {
        // invokeCacheWriterForEntryEvent method has the check that if
        // oldValue is a CacheableToken then it sets it to nullptr; also
        // determines if it should be BEFORE_UPDATE or BEFORE_CREATE depending
        // on oldValue
        if (!invokeCacheWriterForEntryEvent(
                key, oldValue, iter.second, aCallbackArgument,
                CacheEventFlags::LOCAL, BEFORE_UPDATE)) {
          PutActions::logCacheWriterFailure(key, oldValue);
          return GF_CACHEWRITER_ERROR;
        }
      }
    }
  }
  // try remote putAll, if any
  if ((err = putAllNoThrow_remote(map, versionedObjPartListPtr, timeout,
                                  aCallbackArgument)) != GF_NOERR) {
    return err;
  }
  // next the local puts
  GfErrType localErr;
  std::shared_ptr<VersionTag> versionTag;

  if (cachingEnabled) {
    if (m_isPRSingleHopEnabled) { /*New PRSingleHop Case:: PR Singlehop
                                     condition*/
      for (size_t keyIndex = 0;
           keyIndex < versionedObjPartListPtr->getSucceededKeys()->size();
           keyIndex++) {
        const auto valPtr =
            versionedObjPartListPtr->getSucceededKeys()->at(keyIndex);
        const auto& mapIter = map.find(valPtr);
        std::shared_ptr<CacheableKey> key = nullptr;
        std::shared_ptr<Cacheable> value = nullptr;

        if (mapIter != map.end()) {
          key = mapIter->first;
          value = mapIter->second;
        } else {
          // ThrowERROR
          LOGERROR(
              "ERROR :: LocalRegion::putAllNoThrow() Key must be found in "
              "the "
              "usermap");
        }

        if (versionedObjPartListPtr) {
          LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %zu",
                   versionedObjPartListPtr->getVersionedTagptr().size());
          if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
            versionTag =
                versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
          }
        }
        std::pair<std::shared_ptr<Cacheable>, int>& p = oldValueMap[key];
        if ((localErr = LocalRegion::putNoThrow(
                 key, value, aCallbackArgument, p.first, p.second,
                 CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
                 versionTag)) == GF_CACHE_ENTRY_UPDATED) {
          LOGFINEST(
              "Region::putAll: did not change local value for key [%s] "
              "since it has been updated by another thread while operation "
              "was "
              "in progress",
              Utils::nullSafeToString(key).c_str());
        } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
          LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
                   localErr, Utils::nullSafeToString(key).c_str());
          err = localErr;
        } else if (localErr != GF_NOERR) {
          return localErr;
        }
      }      // End of for loop
    } else { /*Non SingleHop case :: PUTALL has taken multiple hops*/
      LOGDEBUG(
          "NILKANTH LocalRegion::putAllNoThrow m_isPRSingleHopEnabled = %d "
          "expected false",
          m_isPRSingleHopEnabled);
      int index = 0;
      for (const auto& iter : map) {
        const auto& key = iter.first;
        const auto& value = iter.second;
        auto& p = oldValueMap[key];

        if (versionedObjPartListPtr) {
          LOGDEBUG(
              "versionedObjPartListPtr->getVersionedTagptr().size() = %zu ",
              versionedObjPartListPtr->getVersionedTagptr().size());
          if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
            versionTag = versionedObjPartListPtr->getVersionedTagptr()[index++];
          }
        }
        if ((localErr = LocalRegion::putNoThrow(
                 key, value, aCallbackArgument, p.first, p.second,
                 CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
                 versionTag)) == GF_CACHE_ENTRY_UPDATED) {
          LOGFINEST(
              "Region::putAll: did not change local value for key [%s] "
              "since it has been updated by another thread while operation "
              "was "
              "in progress",
              Utils::nullSafeToString(key).c_str());
        } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
          LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
                   localErr, Utils::nullSafeToString(key).c_str());
          err = localErr;
        } else if (localErr != GF_NOERR) {
          return localErr;
        }
      }
    }
  }

  m_regionStats->incPutAll();
  return err;
}