in cppcache/src/ThinClientRedundancyManager.cpp [107:468]
GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
bool init, const TcrMessage* request, TcrMessageReply* reply,
ThinClientRegion* region) {
// Preconditions:
// 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints
// 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty
GfErrType err = GF_NOTCON;
// save any fatal errors that occur during maintain redundancy so
// that we can send it back to the caller, to avoid missing out due
// to nonfatal errors such as server not available
GfErrType fatalError = GF_NOERR;
bool fatal = false;
std::lock_guard<decltype(m_redundantEndpointsLock)> guard(
m_redundantEndpointsLock);
bool isRedundancySatisfied = false;
int secondaryCount = 0;
bool isPrimaryConnected = false;
bool isPrimaryAtBack = false;
// TODO: isPrimaryAtBack can be removed by simplifying
// removeEndpointsInOrder().
std::vector<TcrEndpoint*>::iterator itRedundant =
m_redundantEndpoints.begin();
std::vector<TcrEndpoint*> tempRedundantEndpoints;
std::vector<TcrEndpoint*> tempNonredundantEndpoints;
// Redundancy level is maintained as follows using the following vectors:
// m_redundantEndpoints, m_nonredundantEndpoints, tempRedundantEndpoints,
// tempNonredundantEndpoints.
// m_redundantEndpoints and m_nonredundantEndpoints contain the current
// primary-secondary and non-redundant endpoints respectively.
// tempRedundantEndpoints and tempNonredundantEndpoints are vectors that are
// used to adjust the status of endpoints:
// tempRedundantEndpoints: contains endpoints which have been changed from
// nonredundant to either primary or secondary, i.e.
// those endpoints whose status has been changed in order to satisfy
// redundancy level.
// tempNonredundantEndpoints: contains endpoints which were earlier redundant
// but are now not connected. These endpoints will
// be moved from the redundant list to the non redundant list.
// Step 1: Scan all the endpoints in one pass and arrange them according to
// connection status in the following order:
// m_redundantEndpoints,tempRedundantEndpoints,m_nonredundantEndpoints,tempNonredundantEndpoints
// The endpoints are maintained in order in the redundant endpoints lists.
// This order is maintained when the
// endpoints in the temporary list are moved into
// m_redundantEndpoints/m_nonredundantEndpoints.
// Note that although endpoint status may change, the endpoints are not
// deleted from the lists (instead, the endpoints are
// copied to temporary lists).
// Scanning the endpoints is done in two stages:
// 1. First scan the redundant endpoints to find the failed endpoints list,
// whether redundancy has been satisfied.
// 2. If redundancy has not been satisfied, scan the nonredundant list to find
// available endpoints that can be made redundant.
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): checking "
"redundant list, size = %zu",
m_redundantEndpoints.size());
while (!isRedundancySatisfied && itRedundant != m_redundantEndpoints.end()) {
if (!isPrimaryConnected) {
if (itRedundant == m_redundantEndpoints.begin()) {
if ((*itRedundant)->connected()) {
isPrimaryConnected = true;
if (m_redundancyLevel == 0) isRedundancySatisfied = true;
} else {
tempNonredundantEndpoints.push_back(*itRedundant);
}
} else {
if (sendMakePrimaryMesg(*itRedundant, request, region)) {
isPrimaryConnected = true;
} else {
tempNonredundantEndpoints.push_back(*itRedundant);
}
}
} else {
if ((*itRedundant)->connected()) {
secondaryCount++;
if (secondaryCount == m_redundancyLevel) {
isRedundancySatisfied = true;
}
} else {
tempNonredundantEndpoints.push_back(*itRedundant);
}
}
++itRedundant;
}
// If redundancy is not satisfied, find nonredundant endpoints that can be
// made redundant.
// For queue locators, fetch an initial list of endpoints which can host
// queues.
if (!isRedundancySatisfied && m_poolHADM && !init) {
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): building "
"nonredundant list via pool.");
std::list<ServerLocation> outEndpoints;
std::set<ServerLocation> exclEndPts;
for (std::vector<TcrEndpoint*>::iterator itr = m_redundantEndpoints.begin();
itr != m_redundantEndpoints.end(); itr++) {
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): excluding "
"endpoint %s from queue list.",
(*itr)->name().c_str());
ServerLocation serverLoc((*itr)->name());
exclEndPts.insert(serverLoc);
}
m_nonredundantEndpoints.clear();
int howMany = -1;
if (m_locators != nullptr && m_locators->length() > 0 &&
m_servers != nullptr && m_servers->length() == 0) {
// if we are using locators only request the required number of servers.
howMany = m_redundancyLevel - static_cast<int>(exclEndPts.size()) + 1;
}
outEndpoints = selectServers(howMany, exclEndPts);
for (std::list<ServerLocation>::iterator it = outEndpoints.begin();
it != outEndpoints.end(); it++) {
auto ep = m_poolHADM->addEP(*it);
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): Adding "
"endpoint %s to nonredundant list.",
ep->name().c_str());
m_nonredundantEndpoints.push_back(ep.get());
}
}
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): finding "
"nonredundant endpoints, size = %zu",
m_nonredundantEndpoints.size());
std::vector<TcrEndpoint*>::iterator itNonredundant =
m_nonredundantEndpoints.begin();
while (!isRedundancySatisfied &&
itNonredundant != m_nonredundantEndpoints.end()) {
if (!isPrimaryConnected) {
if (secondaryCount == m_redundancyLevel) {
// 38196:Make last endpoint from the non redundant list as primary.
if ((!init || *itNonredundant == m_nonredundantEndpoints.back()) &&
(err = makePrimary(*itNonredundant, request, reply)) == GF_NOERR) {
tempRedundantEndpoints.push_back(*itNonredundant);
isRedundancySatisfied = true;
isPrimaryConnected = true;
isPrimaryAtBack = true;
} else {
if (ThinClientBaseDM::isFatalError(err)) {
fatal = true;
fatalError = err;
}
}
} else {
if ((err = makeSecondary(*itNonredundant, request, reply)) ==
GF_NOERR) {
tempRedundantEndpoints.push_back(*itNonredundant);
secondaryCount++;
} else {
if (ThinClientBaseDM::isFatalError(err)) {
fatal = true;
fatalError = err;
}
}
}
} else {
if ((err = makeSecondary(*itNonredundant, request, reply)) == GF_NOERR) {
tempRedundantEndpoints.push_back(*itNonredundant);
secondaryCount++;
if (secondaryCount == m_redundancyLevel) {
isRedundancySatisfied = true;
}
} else {
if (ThinClientBaseDM::isFatalError(err)) {
fatal = true;
fatalError = err;
}
}
}
++itNonredundant;
}
// Step 2: After one scan of the endpoints, if the redundancy level is
// satisifed by changing status of endpoints, the primary
// endpoint will be present in either m_redundantEndpoints or
// tempRedundantEndpoints.
// However, when redundancy level is not satisifed and following condition
// holds true, a new secondary server (whose status
// changed in Step 1) will be made primary:
// A. No primary server was found, and
// B. secondaryCount <= redundancy level.
// 38196: Prefer Primary conversion from oldHA .
TcrEndpoint* convertedPrimary = nullptr;
if (init && !isRedundancySatisfied && !isPrimaryConnected) {
bool oldHAEndPointPresent = false;
for (std::vector<TcrEndpoint*>::iterator it =
tempRedundantEndpoints.begin();
it != tempRedundantEndpoints.end(); it++) {
if ((*it)->getServerQueueStatus() != NON_REDUNDANT_SERVER) {
oldHAEndPointPresent = true;
break;
}
}
// TODO: Post-38196fix, simplify durable client initialization by removing
// constraint on primary position.
// holds the endpoints that are skipped by the oldHAEndPointPresent
// check in the loop back
std::vector<TcrEndpoint*> tempSkippedEndpoints;
// warning: do not use unsigned type for index since .size() can return 0
while (!tempRedundantEndpoints.empty()) {
TcrEndpoint* ep = tempRedundantEndpoints.back();
if (oldHAEndPointPresent &&
ep->getServerQueueStatus() == NON_REDUNDANT_SERVER) {
tempSkippedEndpoints.push_back(ep);
tempRedundantEndpoints.pop_back();
continue;
}
if (sendMakePrimaryMesg(ep, request, region)) {
// Primary may be in middle If there are older nonredundant
// ep in tempRedundantEndpoints
isPrimaryAtBack = false;
convertedPrimary = ep;
isPrimaryConnected = true;
break;
} else {
tempRedundantEndpoints.pop_back();
}
}
// push back the skipped endpoints into tempRedundantEndpoints
while (!tempSkippedEndpoints.empty()) {
TcrEndpoint* ep = tempSkippedEndpoints.back();
tempSkippedEndpoints.pop_back();
tempRedundantEndpoints.push_back(ep);
}
}
if (!isRedundancySatisfied && !isPrimaryConnected) {
// warning: do not use unsigned type for index since .size() can return 0
while (!tempRedundantEndpoints.empty()) {
TcrEndpoint* ep = tempRedundantEndpoints.back();
if (sendMakePrimaryMesg(ep, request, region)) {
isPrimaryAtBack = true;
isPrimaryConnected = true;
break;
} else {
tempRedundantEndpoints.pop_back();
}
}
}
// Step 3: Finally, create the new redundant and nonredundant lists. Copy from
// m_redundantEndpointsList all the endpoints that were
// marked as disconnected. Add in order all the new redundant endpoints (whose
// status changed in Step 2) to m_redundantEndpoints.
// If primary was at end of temporary list, move it to front on redundant
// list.
// Similarly, adjust the nonredundant list.
removeEndpointsInOrder(m_redundantEndpoints, tempNonredundantEndpoints);
removeEndpointsInOrder(m_nonredundantEndpoints, tempRedundantEndpoints);
// 38196:for DurableReconnect case, primary may be in between, put it @ start.
if (init && !isPrimaryAtBack && convertedPrimary != nullptr) {
moveEndpointToLast(tempRedundantEndpoints, convertedPrimary);
isPrimaryAtBack = true;
}
addEndpointsInOrder(m_redundantEndpoints, tempRedundantEndpoints);
if (isPrimaryConnected && isPrimaryAtBack) {
TcrEndpoint* primary = m_redundantEndpoints.back();
m_redundantEndpoints.pop_back();
m_redundantEndpoints.insert(m_redundantEndpoints.begin(), primary);
}
// Unregister DM for the new non-redundant endpoints
for (std::vector<TcrEndpoint*>::const_iterator iter =
tempNonredundantEndpoints.begin();
iter != tempNonredundantEndpoints.end(); ++iter) {
(*iter)->unregisterDM(true);
}
addEndpointsInOrder(m_nonredundantEndpoints, tempNonredundantEndpoints);
// Postconditions:
// 1. If redundancy level is satisifed, m_redundantEndpoints.size = r + 1,
// m_redundantEndpoints[0] is primary.
// 2. If redundancy level is not satisifed, m_redundantEndpoints.size <= r.
// 3. If primary is connected, m_redundantEndpoints[0] is primary. ( Not
// checked. To verify, We may have to modify
// TcrEndpoint class.)
std::shared_ptr<RemoteQueryService> queryServicePtr;
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_poolHADM);
if (poolDM) {
queryServicePtr = std::dynamic_pointer_cast<RemoteQueryService>(
poolDM->getQueryServiceWithoutCheck());
}
if (queryServicePtr != nullptr) {
if (isPrimaryConnected) {
// call CqStatusListener connect
LOGDEBUG(
"invoke invokeCqConnectedListeners for connected for CQ status "
"listener");
queryServicePtr->invokeCqConnectedListeners(poolDM, true);
} else {
// call CqStatusListener disconnect
LOGDEBUG(
"invoke invokeCqDisConnectedListeners for disconnected for CQ status "
"listener");
queryServicePtr->invokeCqConnectedListeners(poolDM, false);
}
}
// Invariants:
// 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints
// 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty
// The global endpoint list does not change ever for HA so getAllEndpoints
// result or redundantEndpoints/nonredundantEndpoints cannot have stale or
// deleted endpoints
if (m_poolHADM) {
m_poolHADM->getStats().setSubsServers(
static_cast<int32_t>(m_redundantEndpoints.size()));
}
if (isRedundancySatisfied) {
m_allEndpointsDisconnected = false;
m_loggedRedundancyWarning = false;
return GF_NOERR;
} else if (isPrimaryConnected) {
if (fatal && err != GF_NOERR) {
return fatalError;
}
m_allEndpointsDisconnected = false;
if (m_redundancyLevel == -1) {
LOGINFO("Current subscription redundancy level is %zu",
m_redundantEndpoints.size() - 1);
return GF_NOERR;
}
if (!m_loggedRedundancyWarning) {
LOGWARN(
"Requested subscription redundancy level %d is not satisfiable with "
"%zu servers available",
m_redundancyLevel, m_redundantEndpoints.size());
m_loggedRedundancyWarning = true;
}
return GF_NOERR;
} else {
// save any fatal errors that occur during maintain redundancy so
// that we can send it back to the caller, to avoid missing out due
// to nonfatal errors such as server not available
if (m_poolHADM && !m_allEndpointsDisconnected) {
m_poolHADM->clearKeysOfInterestAllRegions();
m_poolHADM->sendNotConnectedMessageToAllregions();
m_allEndpointsDisconnected = true;
}
if (fatal && err != GF_NOERR) {
return fatalError;
}
return err;
}
}