in cppcache/src/ThinClientRegion.cpp [1564:1808]
GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
ThinClientPoolDM* tcrdm,
const std::vector<std::shared_ptr<CacheableKey>>& keys,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG(
" ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %zu",
keys.size());
auto region = shared_from_this();
GfErrType error = GF_NOERR;
auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
keys, region, true);
if (!locationMap) {
// removeAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
aCallbackArgument);
}
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
LOGDEBUG("locationMap.size() = %zu ", locationMap->size());
/*Step-2
* a. create vector of RemoveAllWork
* b. locationMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
* server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of RemoveAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<std::shared_ptr<RemoveAllWork>> removeAllWorkers;
auto& threadPool = m_cacheImpl->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
if (serverLocation == nullptr) {
LOGDEBUG("serverLocation is nullptr");
}
const auto& mappedkeys = locationIter.second;
auto worker = std::make_shared<RemoveAllWork>(
tcrdm, serverLocation, region, true /*attemptFailover*/,
false /*isBGThread*/, mappedkeys, aCallbackArgument);
threadPool.perform(worker);
removeAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<Serializable>>, 2nd part, Value can be a
* std::shared_ptr<VersionedCacheableObjectPartList> or
* std::shared_ptr<PutAllPartialResultServerException>.
* failedServers<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
* the worker
*/
auto resultMap = ResultMap();
auto failedServers = FailedServersMap();
for (const auto& worker : removeAllWorkers) {
auto err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (GF_NOERR == err) {
// No Exception from server
resultMap.emplace(worker->getServerLocation(),
worker->getResultCollector()->getList());
} else {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap.emplace(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers.emplace(worker->getServerLocation(),
CacheableInt32::create(error));
}
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
cnt++;
}
/**
* Step:4
* a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
* map.size() b. Iterate over the resultMap and value for the particular
* serverlocation is of type VersionedCacheableObjectPartList add keys and
* versions. C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
std::recursive_mutex responseLock;
auto result = std::make_shared<PutAllPartialResult>(
static_cast<int>(keys.size()), responseLock);
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap.size());
for (const auto& resultMapIter : resultMap) {
const auto& value = resultMapIter.second;
if (const auto papException =
std::dynamic_pointer_cast<PutAllPartialResultServerException>(
value)) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (const auto list =
std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
value)) {
// value in resultMap is of type VCOPL.
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString().c_str());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"is nullptr");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and
* keep adding set of keys (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
__LINE__, failedServers.size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (!failedServers.empty()) {
auto succeedKeySet =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (const auto& locationIter : *locationMap) {
if (failedServers.find(locationIter.first) != failedServers.end()) {
for (const auto& i : *(locationIter.second)) {
succeedKeySet->push_back(i);
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (const auto& failedServerIter : failedServers) {
if (failedServerIter.second->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
nullptr;
const auto& failedSerInLocMapIter =
locationMap->find(failedServerIter.first);
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter->second;
}
if (failedKeys == nullptr) {
LOGERROR(
"TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are "
"nullptr "
"that is not valid");
}
std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
std::shared_ptr<PutAllPartialResultServerException> papResultServerExc =
nullptr;
GfErrType errCode = multiHopRemoveAllNoThrow_remote(
*failedKeys, vcopListPtr, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
std::shared_ptr<Exception> excptPtr = nullptr;
result->saveFailedKey(failedKeys->at(0), excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
return error;
}