in cppcache/src/LocalRegion.cpp [1931:2104]
GfErrType LocalRegion::putAllNoThrow(
const HashMapOfCacheable& map, std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(shared_lock);
GfErrType err = GF_NOERR;
// std::shared_ptr<VersionTag> versionTag;
std::shared_ptr<VersionedCacheableObjectPartList>
versionedObjPartListPtr; //= new VersionedCacheableObjectPartList();
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) {
return GF_NOTSUP;
}
err = putAllNoThrow_remote(map, /*versionTag*/ versionedObjPartListPtr,
timeout, aCallbackArgument);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
MapOfOldValue oldValueMap;
// remove tracking for the entries befor exiting the function
struct RemoveTracking {
private:
const MapOfOldValue& m_oldValueMap;
LocalRegion& m_region;
public:
RemoveTracking(const MapOfOldValue& oldValueMap, LocalRegion& region)
: m_oldValueMap(oldValueMap), m_region(region) {}
~RemoveTracking() {
if (!m_region.getAttributes().getConcurrencyChecksEnabled()) {
// need to remove the tracking added to the entries at the end
for (MapOfOldValue::const_iterator iter = m_oldValueMap.begin();
iter != m_oldValueMap.end(); ++iter) {
if (iter->second.second >= 0) {
m_region.m_entries->removeTrackerForEntry(iter->first);
}
}
}
}
} _removeTracking(oldValueMap, *this);
if (cachingEnabled || m_writer != nullptr) {
std::shared_ptr<Cacheable> oldValue;
for (const auto& iter : map) {
const auto& key = iter.first;
if (cachingEnabled && !m_regionAttributes.getConcurrencyChecksEnabled()) {
int updateCount =
m_entries->addTrackerForEntry(key, oldValue, true, false, true);
oldValueMap.insert(
std::make_pair(key, std::make_pair(oldValue, updateCount)));
}
if (m_writer != nullptr) {
// invokeCacheWriterForEntryEvent method has the check that if
// oldValue is a CacheableToken then it sets it to nullptr; also
// determines if it should be BEFORE_UPDATE or BEFORE_CREATE depending
// on oldValue
if (!invokeCacheWriterForEntryEvent(
key, oldValue, iter.second, aCallbackArgument,
CacheEventFlags::LOCAL, BEFORE_UPDATE)) {
PutActions::logCacheWriterFailure(key, oldValue);
return GF_CACHEWRITER_ERROR;
}
}
}
}
// try remote putAll, if any
if ((err = putAllNoThrow_remote(map, versionedObjPartListPtr, timeout,
aCallbackArgument)) != GF_NOERR) {
return err;
}
// next the local puts
GfErrType localErr;
std::shared_ptr<VersionTag> versionTag;
if (cachingEnabled) {
if (m_isPRSingleHopEnabled) { /*New PRSingleHop Case:: PR Singlehop
condition*/
for (size_t keyIndex = 0;
keyIndex < versionedObjPartListPtr->getSucceededKeys()->size();
keyIndex++) {
const auto valPtr =
versionedObjPartListPtr->getSucceededKeys()->at(keyIndex);
const auto& mapIter = map.find(valPtr);
std::shared_ptr<CacheableKey> key = nullptr;
std::shared_ptr<Cacheable> value = nullptr;
if (mapIter != map.end()) {
key = mapIter->first;
value = mapIter->second;
} else {
// ThrowERROR
LOGERROR(
"ERROR :: LocalRegion::putAllNoThrow() Key must be found in "
"the "
"usermap");
}
if (versionedObjPartListPtr) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %zu",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag =
versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
}
}
std::pair<std::shared_ptr<Cacheable>, int>& p = oldValueMap[key];
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation "
"was "
"in progress",
Utils::nullSafeToString(key).c_str());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::nullSafeToString(key).c_str());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
} // End of for loop
} else { /*Non SingleHop case :: PUTALL has taken multiple hops*/
LOGDEBUG(
"NILKANTH LocalRegion::putAllNoThrow m_isPRSingleHopEnabled = %d "
"expected false",
m_isPRSingleHopEnabled);
int index = 0;
for (const auto& iter : map) {
const auto& key = iter.first;
const auto& value = iter.second;
auto& p = oldValueMap[key];
if (versionedObjPartListPtr) {
LOGDEBUG(
"versionedObjPartListPtr->getVersionedTagptr().size() = %zu ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag = versionedObjPartListPtr->getVersionedTagptr()[index++];
}
}
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation "
"was "
"in progress",
Utils::nullSafeToString(key).c_str());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::nullSafeToString(key).c_str());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
}
}
}
m_regionStats->incPutAll();
return err;
}