in cppcache/src/ThinClientRegion.cpp [1174:1474]
GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %zu",
map.size());
auto region = shared_from_this();
auto error = GF_NOERR;
/*Step-1::
* populate the keys vector from the user Map and pass it to the
* getServerToFilterMap to generate locationMap
* If locationMap is nullptr try the old, existing putAll impl that may take
* multiple n/w hops
*/
auto userKeys = std::vector<std::shared_ptr<CacheableKey>>();
for (const auto& iter : map) {
userKeys.push_back(iter.first);
}
// last param in getServerToFilterMap() is false for putAll
// LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d
// ", userKeys->size());
auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
userKeys, region, true);
if (!locationMap) {
// putAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
aCallbackArgument);
}
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
// LOGDEBUG("locationMap.size() = %d ", locationMap->size());
/*Step-2
* a. create vector of PutAllWork
* b. locationMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
* server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of PutAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<std::shared_ptr<PutAllWork>> putAllWorkers;
auto& threadPool = m_cacheImpl->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
if (serverLocation == nullptr) {
LOGDEBUG("serverLocation is nullptr");
}
const auto& keys = locationIter.second;
// Create server specific Sub-Map by iterating over keys.
auto filteredMap = std::make_shared<HashMapOfCacheable>();
if (keys != nullptr && keys->size() > 0) {
for (const auto& key : *keys) {
const auto& iter = map.find(key);
if (iter != map.end()) {
filteredMap->emplace(iter->first, iter->second);
}
}
}
auto worker = std::make_shared<PutAllWork>(
tcrdm, serverLocation, region, true /*attemptFailover*/,
false /*isBGThread*/, filteredMap, keys, timeout, aCallbackArgument);
threadPool.perform(worker);
putAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<Serializable>>, 2nd part, Value can be a
* std::shared_ptr<VersionedCacheableObjectPartList> or
* std::shared_ptr<PutAllPartialResultServerException>.
* failedServers<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
* the worker
*/
auto resultMap = ResultMap();
auto failedServers = FailedServersMap();
for (const auto& worker : putAllWorkers) {
auto err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (GF_NOERR == err) {
// No Exception from server
resultMap.emplace(worker->getServerLocation(),
worker->getResultCollector()->getList());
} else {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap.emplace(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers.emplace(worker->getServerLocation(),
CacheableInt32::create(error));
}
LOGDEBUG("worker->getPutAllMap()->size() = %zu ",
worker->getPutAllMap()->size());
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
// TODO::CHECK, why do we need following code... ??
// TcrMessage* currentReply = worker->getReply();
/*
if(currentReply->getMessageType() != TcrMessage::REPLY)
{
reply.setMessageType(currentReply->getMessageType());
}
*/
cnt++;
}
/**
* Step:4
* a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
* map.size() b. Iterate over the resultMap and value for the particular
* serverlocation is of type VersionedCacheableObjectPartList add keys and
* versions. C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
std::recursive_mutex responseLock;
auto result = std::make_shared<PutAllPartialResult>(
static_cast<int>(map.size()), responseLock);
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap.size());
for (const auto& resultMapIter : resultMap) {
const auto& value = resultMapIter.second;
if (const auto papException =
std::dynamic_pointer_cast<PutAllPartialResultServerException>(
value)) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (const auto list =
std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
value)) {
// value in resultMap is of type VCOPL.
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString().c_str());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is "
"nullptr");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and
* keep adding set of keys (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
__LINE__, failedServers.size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (!failedServers.empty()) {
auto succeedKeySet =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (const auto& locationIter : *locationMap) {
if (failedServers.find(locationIter.first) != failedServers.end()) {
for (const auto& i : *(locationIter.second)) {
succeedKeySet->push_back(i);
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (const auto& failedServerIter : failedServers) {
if (failedServerIter.second->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
nullptr;
const auto& failedSerInLocMapIter =
locationMap->find(failedServerIter.first);
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter->second;
}
if (failedKeys == nullptr) {
LOGERROR(
"TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are nullptr "
"that is not valid");
}
auto newSubMap = std::make_shared<HashMapOfCacheable>();
if (failedKeys && !failedKeys->empty()) {
for (const auto& key : *failedKeys) {
const auto& iter = map.find(key);
if (iter != map.end()) {
newSubMap->emplace(iter->first, iter->second);
} else {
LOGERROR(
"DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not "
"found in user failedSubMap");
}
}
}
std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
GfErrType errCode = multiHopPutAllNoThrow_remote(
*newSubMap, vcopListPtr, timeout, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
// TODO:: Commented it as papResultServerExc is nullptr this time
// UnComment it once you read papResultServerExc.
// result->consolidate(papResultServerExc->getResult());
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
const auto& firstKey = newSubMap->begin()->first;
std::shared_ptr<Exception> excptPtr = nullptr;
// TODO:: formulat excptPtr from the errCode
result->saveFailedKey(firstKey, excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
return error;
}