in cppcache/src/ExecutionImpl.cpp [97:348]
std::shared_ptr<ResultCollector> ExecutionImpl::execute(
const std::string& func, std::chrono::milliseconds timeout) {
LOGDEBUG("ExecutionImpl::execute: ");
GuardUserAttributes gua;
if (m_authenticatedView != nullptr) {
LOGDEBUG("ExecutionImpl::execute function on authenticated cache");
gua.setAuthenticatedView(m_authenticatedView);
}
bool serverHasResult = false;
bool serverIsHA = false;
bool serverOptimizeForWrite = false;
auto&& attr = getFunctionAttributes(func);
{
if (attr == nullptr) {
std::lock_guard<decltype(m_func_attrs_lock)> _guard(m_func_attrs_lock);
GfErrType err = GF_NOERR;
attr = getFunctionAttributes(func);
if (attr == nullptr) {
if (m_region != nullptr) {
err = dynamic_cast<ThinClientRegion*>(m_region.get())
->getFuncAttributes(func, &attr);
} else if (m_pool != nullptr) {
err = getFuncAttributes(func, &attr);
}
if (err != GF_NOERR) {
throwExceptionIfError("Execute::GET_FUNCTION_ATTRIBUTES", err);
}
if (!attr->empty() && err == GF_NOERR) {
m_func_attrs[func] = attr;
}
}
}
}
serverHasResult = ((attr->at(0) == 1) ? true : false);
serverIsHA = ((attr->at(1) == 1) ? true : false);
serverOptimizeForWrite = ((attr->at(2) == 1) ? true : false);
LOGDEBUG(
"ExecutionImpl::execute got functionAttributes from server for function "
"= %s serverHasResult = %d serverIsHA = %d serverOptimizeForWrite = %d ",
func.c_str(), serverHasResult, serverIsHA, serverOptimizeForWrite);
if (serverHasResult == false) {
m_rc = std::make_shared<NoResult>();
} else if (m_rc == nullptr) {
m_rc = std::make_shared<DefaultResultCollector>();
}
uint8_t isHAHasResultOptimizeForWrite = 0;
if (serverIsHA) {
isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 1;
}
if (serverHasResult) {
isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 2;
}
if (serverOptimizeForWrite) {
isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 4;
}
LOGDEBUG("ExecutionImpl::execute: isHAHasResultOptimizeForWrite = %d",
isHAHasResultOptimizeForWrite);
TXState* txState = TSSTXStateWrapper::get().getTXState();
if (txState != nullptr && m_allServer == true) {
throw UnsupportedOperationException(
"Execution::execute: Transaction function execution on all servers is "
"not supported");
}
if (m_region != nullptr) {
int32_t retryAttempts = 3;
if (m_pool != nullptr) {
retryAttempts = m_pool->getRetryAttempts();
}
if (m_pool != nullptr && m_pool->getPRSingleHopEnabled()) {
auto tcrdm = std::dynamic_pointer_cast<ThinClientPoolDM>(m_pool);
if (!tcrdm) {
throw IllegalArgumentException(
"Execute: pool cast to ThinClientPoolDM failed");
}
auto cms = tcrdm->getClientMetaDataService();
auto failedNodes = CacheableHashSet::create();
if ((!m_routingObj || m_routingObj->empty()) &&
txState == nullptr) { // For transactions we should not create
// multiple threads
LOGDEBUG("ExecutionImpl::execute: m_routingObj is empty");
auto serverToBucketsMap = cms->groupByServerToAllBuckets(
m_region,
/*serverOptimizeForWrite*/ (isHAHasResultOptimizeForWrite & 4) ==
4);
if (!serverToBucketsMap || serverToBucketsMap->empty()) {
LOGDEBUG(
"ExecutionImpl::execute: m_routingObj is empty and locationMap "
"is also empty so use old FE onRegion");
std::dynamic_pointer_cast<ThinClientRegion>(m_region)
->executeFunction(
func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
timeout);
dynamic_cast<ThinClientRegion*>(m_region.get())
->setMetaDataRefreshed(false);
cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0);
} else {
// convert server to bucket map to server to key map where bucket id
// is key.
auto serverToKeysMap =
std::make_shared<ClientMetadataService::ServerToKeysMap>(
serverToBucketsMap->size());
for (const auto& entry : *serverToBucketsMap) {
auto keys = std::make_shared<CacheableHashSet>(
static_cast<int32_t>(entry.second->size()));
for (const auto& bucket : *(entry.second)) {
keys->insert(CacheableInt32::create(bucket));
}
serverToKeysMap->emplace(entry.first, keys);
}
LOGDEBUG(
"ExecutionImpl::execute: withoutFilter and locationMap is not "
"empty");
bool reExecute = std::dynamic_pointer_cast<ThinClientRegion>(m_region)
->executeFunctionSH(
func, m_args, isHAHasResultOptimizeForWrite,
m_rc, serverToKeysMap, failedNodes, timeout,
/*allBuckets*/ true);
if (reExecute) { // Fallback to old FE onREgion
if (isHAHasResultOptimizeForWrite & 1) { // isHA = true
m_rc->clearResults();
auto rs =
std::dynamic_pointer_cast<ThinClientRegion>(m_region)
->reExecuteFunction(func, m_args, m_routingObj,
isHAHasResultOptimizeForWrite, m_rc,
(isHAHasResultOptimizeForWrite & 1)
? retryAttempts
: 0,
failedNodes, timeout);
}
}
}
} else if (m_routingObj != nullptr && m_routingObj->size() == 1) {
LOGDEBUG("executeFunction onRegion WithFilter size equal to 1 ");
dynamic_cast<ThinClientRegion*>(m_region.get())
->executeFunction(
func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc,
(isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
timeout);
} else {
if (txState == nullptr) {
auto serverToKeysMap = cms->getServerToFilterMapFESHOP(
m_routingObj, m_region, /*serverOptimizeForWrite*/
(isHAHasResultOptimizeForWrite & 4) == 4);
if (!serverToKeysMap || serverToKeysMap->empty()) {
LOGDEBUG(
"ExecutionImpl::execute: withFilter but locationMap is empty "
"so use old FE onRegion");
dynamic_cast<ThinClientRegion*>(m_region.get())
->executeFunction(
func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
m_rc,
(isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
timeout);
cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0);
} else {
LOGDEBUG(
"ExecutionImpl::execute: withFilter and locationMap is not "
"empty");
bool reExecute =
dynamic_cast<ThinClientRegion*>(m_region.get())
->executeFunctionSH(func, m_args,
isHAHasResultOptimizeForWrite, m_rc,
serverToKeysMap, failedNodes, timeout,
/*allBuckets*/ false);
if (reExecute) { // Fallback to old FE onREgion
if (isHAHasResultOptimizeForWrite & 1) { // isHA = true
m_rc->clearResults();
auto rs =
dynamic_cast<ThinClientRegion*>(m_region.get())
->reExecuteFunction(func, m_args, m_routingObj,
isHAHasResultOptimizeForWrite, m_rc,
(isHAHasResultOptimizeForWrite & 1)
? retryAttempts
: 0,
failedNodes, timeout);
}
}
}
} else { // For transactions use old way
dynamic_cast<ThinClientRegion*>(m_region.get())
->executeFunction(
func, m_args, m_routingObj, isHAHasResultOptimizeForWrite,
m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0,
timeout);
}
}
} else { // w/o single hop, Fallback to old FE onREgion
dynamic_cast<ThinClientRegion*>(m_region.get())
->executeFunction(
func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc,
(isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, timeout);
}
/* } catch (TransactionDataNodeHasDepartedException e) {
if(txState == nullptr)
{
GfErrTypeThrowException("Transaction is nullptr",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
if(!txState->isReplay())
txState->replay(false);
} catch(TransactionDataRebalancedException e) {
if(txState == nullptr)
{
GfErrTypeThrowException("Transaction is nullptr",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
if(!txState->isReplay())
txState->replay(true);
}
*/
if (serverHasResult == true) {
// ExecutionImpl::addResults(m_rc, rs);
m_rc->endResults();
}
return m_rc;
} else if (m_pool != nullptr) {
if (txState != nullptr) {
throw UnsupportedOperationException(
"Execution::execute: Transaction function execution on pool is not "
"supported");
}
if (m_allServer == false) {
executeOnPool(
func, isHAHasResultOptimizeForWrite,
(isHAHasResultOptimizeForWrite & 1) ? m_pool->getRetryAttempts() : 0,
timeout);
if (serverHasResult == true) {
// ExecutionImpl::addResults(m_rc, rs);
m_rc->endResults();
}
return m_rc;
}
executeOnAllServers(func, isHAHasResultOptimizeForWrite, timeout);
} else {
throw IllegalStateException("Execution::execute: should not be here");
}
return m_rc;
}