cppcache/src/ThinClientRedundancyManager.cpp (996 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ThinClientRedundancyManager.hpp" #include <algorithm> #include <set> #include <geode/AuthenticatedView.hpp> #include <geode/SystemProperties.hpp> #include "CacheImpl.hpp" #include "ClientProxyMembershipID.hpp" #include "FunctionExpiryTask.hpp" #include "RemoteQueryService.hpp" #include "ServerLocation.hpp" #include "TcrHADistributionManager.hpp" #include "ThinClientPoolHADM.hpp" #include "ThinClientRegion.hpp" #include "UserAttributes.hpp" namespace apache { namespace geode { namespace client { const int MIN_RETRY_ATTEMPTS = 5; const char* ThinClientRedundancyManager::NC_PerodicACK = "NC PerodicACK"; ThinClientRedundancyManager::ThinClientRedundancyManager( TcrConnectionManager* theConnManager, int redundancyLevel, ThinClientPoolHADM* poolHADM, bool sentReadyForEvents, bool globalProcessedMarker) : m_globalProcessedMarker(globalProcessedMarker), m_allEndpointsDisconnected(false), m_server(0), m_sentReadyForEvents(sentReadyForEvents), m_redundancyLevel(redundancyLevel), m_loggedRedundancyWarning(false), m_poolHADM(poolHADM), m_theTcrConnManager(theConnManager), m_locators(nullptr), m_servers(nullptr), m_periodicAckTask(nullptr), periodic_ack_semaphore_(1), m_HAenabled(false) {} std::list<ServerLocation> ThinClientRedundancyManager::selectServers( int howMany, std::set<ServerLocation> exclEndPts) { LOGFINE("Selecting %d servers with %d in exclude list", howMany, exclEndPts.size()); std::list<ServerLocation> outEndpoints; if (m_locators->length() > 0) { try { std::string additionalLoc; ClientProxyMembershipID* m_proxyID = m_poolHADM->getMembershipId(); m_poolHADM->getLocatorHelper()->getEndpointForNewCallBackConn( *m_proxyID, outEndpoints, additionalLoc, howMany, exclEndPts, m_poolHADM->getServerGroup()); } catch (const AuthenticationRequiredException&) { return outEndpoints; } catch (const NoAvailableLocatorsException&) { LOGFINE("No locators available"); return outEndpoints; } } else if (m_servers->length() > 0) { if (howMany == -1) howMany = m_servers->length(); for (int attempts = 0; attempts < m_servers->length() && howMany > 0; attempts++) { if (m_servers->length() < 0 || m_server >= static_cast<std::size_t>(m_servers->length())) { m_server = 0; } ServerLocation location(Utils::convertHostToCanonicalForm( (*m_servers)[m_server++]->value().c_str()) .c_str()); if (exclEndPts.find(location) != exclEndPts.end()) { // exclude this one continue; } outEndpoints.push_back(location); howMany--; } } else { throw IllegalStateException( "No locators or servers available to select from"); } return outEndpoints; } GfErrType ThinClientRedundancyManager::maintainRedundancyLevel( bool init, const TcrMessage* request, TcrMessageReply* reply, ThinClientRegion* region) { // Preconditions: // 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints // 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty GfErrType err = GF_NOTCON; // save any fatal errors that occur during maintain redundancy so // that we can send it back to the caller, to avoid missing out due // to nonfatal errors such as server not available GfErrType fatalError = GF_NOERR; bool fatal = false; std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); bool isRedundancySatisfied = false; int secondaryCount = 0; bool isPrimaryConnected = false; bool isPrimaryAtBack = false; // TODO: isPrimaryAtBack can be removed by simplifying // removeEndpointsInOrder(). std::vector<TcrEndpoint*>::iterator itRedundant = m_redundantEndpoints.begin(); std::vector<TcrEndpoint*> tempRedundantEndpoints; std::vector<TcrEndpoint*> tempNonredundantEndpoints; // Redundancy level is maintained as follows using the following vectors: // m_redundantEndpoints, m_nonredundantEndpoints, tempRedundantEndpoints, // tempNonredundantEndpoints. // m_redundantEndpoints and m_nonredundantEndpoints contain the current // primary-secondary and non-redundant endpoints respectively. // tempRedundantEndpoints and tempNonredundantEndpoints are vectors that are // used to adjust the status of endpoints: // tempRedundantEndpoints: contains endpoints which have been changed from // nonredundant to either primary or secondary, i.e. // those endpoints whose status has been changed in order to satisfy // redundancy level. // tempNonredundantEndpoints: contains endpoints which were earlier redundant // but are now not connected. These endpoints will // be moved from the redundant list to the non redundant list. // Step 1: Scan all the endpoints in one pass and arrange them according to // connection status in the following order: // m_redundantEndpoints,tempRedundantEndpoints,m_nonredundantEndpoints,tempNonredundantEndpoints // The endpoints are maintained in order in the redundant endpoints lists. // This order is maintained when the // endpoints in the temporary list are moved into // m_redundantEndpoints/m_nonredundantEndpoints. // Note that although endpoint status may change, the endpoints are not // deleted from the lists (instead, the endpoints are // copied to temporary lists). // Scanning the endpoints is done in two stages: // 1. First scan the redundant endpoints to find the failed endpoints list, // whether redundancy has been satisfied. // 2. If redundancy has not been satisfied, scan the nonredundant list to find // available endpoints that can be made redundant. LOGDEBUG( "ThinClientRedundancyManager::maintainRedundancyLevel(): checking " "redundant list, size = %zu", m_redundantEndpoints.size()); while (!isRedundancySatisfied && itRedundant != m_redundantEndpoints.end()) { if (!isPrimaryConnected) { if (itRedundant == m_redundantEndpoints.begin()) { if ((*itRedundant)->connected()) { isPrimaryConnected = true; if (m_redundancyLevel == 0) isRedundancySatisfied = true; } else { tempNonredundantEndpoints.push_back(*itRedundant); } } else { if (sendMakePrimaryMesg(*itRedundant, request, region)) { isPrimaryConnected = true; } else { tempNonredundantEndpoints.push_back(*itRedundant); } } } else { if ((*itRedundant)->connected()) { secondaryCount++; if (secondaryCount == m_redundancyLevel) { isRedundancySatisfied = true; } } else { tempNonredundantEndpoints.push_back(*itRedundant); } } ++itRedundant; } // If redundancy is not satisfied, find nonredundant endpoints that can be // made redundant. // For queue locators, fetch an initial list of endpoints which can host // queues. if (!isRedundancySatisfied && m_poolHADM && !init) { LOGDEBUG( "ThinClientRedundancyManager::maintainRedundancyLevel(): building " "nonredundant list via pool."); std::list<ServerLocation> outEndpoints; std::set<ServerLocation> exclEndPts; for (std::vector<TcrEndpoint*>::iterator itr = m_redundantEndpoints.begin(); itr != m_redundantEndpoints.end(); itr++) { LOGDEBUG( "ThinClientRedundancyManager::maintainRedundancyLevel(): excluding " "endpoint %s from queue list.", (*itr)->name().c_str()); ServerLocation serverLoc((*itr)->name()); exclEndPts.insert(serverLoc); } m_nonredundantEndpoints.clear(); int howMany = -1; if (m_locators != nullptr && m_locators->length() > 0 && m_servers != nullptr && m_servers->length() == 0) { // if we are using locators only request the required number of servers. howMany = m_redundancyLevel - static_cast<int>(exclEndPts.size()) + 1; } outEndpoints = selectServers(howMany, exclEndPts); for (std::list<ServerLocation>::iterator it = outEndpoints.begin(); it != outEndpoints.end(); it++) { auto ep = m_poolHADM->addEP(*it); LOGDEBUG( "ThinClientRedundancyManager::maintainRedundancyLevel(): Adding " "endpoint %s to nonredundant list.", ep->name().c_str()); m_nonredundantEndpoints.push_back(ep.get()); } } LOGDEBUG( "ThinClientRedundancyManager::maintainRedundancyLevel(): finding " "nonredundant endpoints, size = %zu", m_nonredundantEndpoints.size()); std::vector<TcrEndpoint*>::iterator itNonredundant = m_nonredundantEndpoints.begin(); while (!isRedundancySatisfied && itNonredundant != m_nonredundantEndpoints.end()) { if (!isPrimaryConnected) { if (secondaryCount == m_redundancyLevel) { // 38196:Make last endpoint from the non redundant list as primary. if ((!init || *itNonredundant == m_nonredundantEndpoints.back()) && (err = makePrimary(*itNonredundant, request, reply)) == GF_NOERR) { tempRedundantEndpoints.push_back(*itNonredundant); isRedundancySatisfied = true; isPrimaryConnected = true; isPrimaryAtBack = true; } else { if (ThinClientBaseDM::isFatalError(err)) { fatal = true; fatalError = err; } } } else { if ((err = makeSecondary(*itNonredundant, request, reply)) == GF_NOERR) { tempRedundantEndpoints.push_back(*itNonredundant); secondaryCount++; } else { if (ThinClientBaseDM::isFatalError(err)) { fatal = true; fatalError = err; } } } } else { if ((err = makeSecondary(*itNonredundant, request, reply)) == GF_NOERR) { tempRedundantEndpoints.push_back(*itNonredundant); secondaryCount++; if (secondaryCount == m_redundancyLevel) { isRedundancySatisfied = true; } } else { if (ThinClientBaseDM::isFatalError(err)) { fatal = true; fatalError = err; } } } ++itNonredundant; } // Step 2: After one scan of the endpoints, if the redundancy level is // satisifed by changing status of endpoints, the primary // endpoint will be present in either m_redundantEndpoints or // tempRedundantEndpoints. // However, when redundancy level is not satisifed and following condition // holds true, a new secondary server (whose status // changed in Step 1) will be made primary: // A. No primary server was found, and // B. secondaryCount <= redundancy level. // 38196: Prefer Primary conversion from oldHA . TcrEndpoint* convertedPrimary = nullptr; if (init && !isRedundancySatisfied && !isPrimaryConnected) { bool oldHAEndPointPresent = false; for (std::vector<TcrEndpoint*>::iterator it = tempRedundantEndpoints.begin(); it != tempRedundantEndpoints.end(); it++) { if ((*it)->getServerQueueStatus() != NON_REDUNDANT_SERVER) { oldHAEndPointPresent = true; break; } } // TODO: Post-38196fix, simplify durable client initialization by removing // constraint on primary position. // holds the endpoints that are skipped by the oldHAEndPointPresent // check in the loop back std::vector<TcrEndpoint*> tempSkippedEndpoints; // warning: do not use unsigned type for index since .size() can return 0 while (!tempRedundantEndpoints.empty()) { TcrEndpoint* ep = tempRedundantEndpoints.back(); if (oldHAEndPointPresent && ep->getServerQueueStatus() == NON_REDUNDANT_SERVER) { tempSkippedEndpoints.push_back(ep); tempRedundantEndpoints.pop_back(); continue; } if (sendMakePrimaryMesg(ep, request, region)) { // Primary may be in middle If there are older nonredundant // ep in tempRedundantEndpoints isPrimaryAtBack = false; convertedPrimary = ep; isPrimaryConnected = true; break; } else { tempRedundantEndpoints.pop_back(); } } // push back the skipped endpoints into tempRedundantEndpoints while (!tempSkippedEndpoints.empty()) { TcrEndpoint* ep = tempSkippedEndpoints.back(); tempSkippedEndpoints.pop_back(); tempRedundantEndpoints.push_back(ep); } } if (!isRedundancySatisfied && !isPrimaryConnected) { // warning: do not use unsigned type for index since .size() can return 0 while (!tempRedundantEndpoints.empty()) { TcrEndpoint* ep = tempRedundantEndpoints.back(); if (sendMakePrimaryMesg(ep, request, region)) { isPrimaryAtBack = true; isPrimaryConnected = true; break; } else { tempRedundantEndpoints.pop_back(); } } } // Step 3: Finally, create the new redundant and nonredundant lists. Copy from // m_redundantEndpointsList all the endpoints that were // marked as disconnected. Add in order all the new redundant endpoints (whose // status changed in Step 2) to m_redundantEndpoints. // If primary was at end of temporary list, move it to front on redundant // list. // Similarly, adjust the nonredundant list. removeEndpointsInOrder(m_redundantEndpoints, tempNonredundantEndpoints); removeEndpointsInOrder(m_nonredundantEndpoints, tempRedundantEndpoints); // 38196:for DurableReconnect case, primary may be in between, put it @ start. if (init && !isPrimaryAtBack && convertedPrimary != nullptr) { moveEndpointToLast(tempRedundantEndpoints, convertedPrimary); isPrimaryAtBack = true; } addEndpointsInOrder(m_redundantEndpoints, tempRedundantEndpoints); if (isPrimaryConnected && isPrimaryAtBack) { TcrEndpoint* primary = m_redundantEndpoints.back(); m_redundantEndpoints.pop_back(); m_redundantEndpoints.insert(m_redundantEndpoints.begin(), primary); } // Unregister DM for the new non-redundant endpoints for (std::vector<TcrEndpoint*>::const_iterator iter = tempNonredundantEndpoints.begin(); iter != tempNonredundantEndpoints.end(); ++iter) { (*iter)->unregisterDM(true); } addEndpointsInOrder(m_nonredundantEndpoints, tempNonredundantEndpoints); // Postconditions: // 1. If redundancy level is satisifed, m_redundantEndpoints.size = r + 1, // m_redundantEndpoints[0] is primary. // 2. If redundancy level is not satisifed, m_redundantEndpoints.size <= r. // 3. If primary is connected, m_redundantEndpoints[0] is primary. ( Not // checked. To verify, We may have to modify // TcrEndpoint class.) std::shared_ptr<RemoteQueryService> queryServicePtr; ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_poolHADM); if (poolDM) { queryServicePtr = std::dynamic_pointer_cast<RemoteQueryService>( poolDM->getQueryServiceWithoutCheck()); } if (queryServicePtr != nullptr) { if (isPrimaryConnected) { // call CqStatusListener connect LOGDEBUG( "invoke invokeCqConnectedListeners for connected for CQ status " "listener"); queryServicePtr->invokeCqConnectedListeners(poolDM, true); } else { // call CqStatusListener disconnect LOGDEBUG( "invoke invokeCqDisConnectedListeners for disconnected for CQ status " "listener"); queryServicePtr->invokeCqConnectedListeners(poolDM, false); } } // Invariants: // 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints // 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty // The global endpoint list does not change ever for HA so getAllEndpoints // result or redundantEndpoints/nonredundantEndpoints cannot have stale or // deleted endpoints if (m_poolHADM) { m_poolHADM->getStats().setSubsServers( static_cast<int32_t>(m_redundantEndpoints.size())); } if (isRedundancySatisfied) { m_allEndpointsDisconnected = false; m_loggedRedundancyWarning = false; return GF_NOERR; } else if (isPrimaryConnected) { if (fatal && err != GF_NOERR) { return fatalError; } m_allEndpointsDisconnected = false; if (m_redundancyLevel == -1) { LOGINFO("Current subscription redundancy level is %zu", m_redundantEndpoints.size() - 1); return GF_NOERR; } if (!m_loggedRedundancyWarning) { LOGWARN( "Requested subscription redundancy level %d is not satisfiable with " "%zu servers available", m_redundancyLevel, m_redundantEndpoints.size()); m_loggedRedundancyWarning = true; } return GF_NOERR; } else { // save any fatal errors that occur during maintain redundancy so // that we can send it back to the caller, to avoid missing out due // to nonfatal errors such as server not available if (m_poolHADM && !m_allEndpointsDisconnected) { m_poolHADM->clearKeysOfInterestAllRegions(); m_poolHADM->sendNotConnectedMessageToAllregions(); m_allEndpointsDisconnected = true; } if (fatal && err != GF_NOERR) { return fatalError; } return err; } } void ThinClientRedundancyManager::removeEndpointsInOrder( std::vector<TcrEndpoint*>& destVector, const std::vector<TcrEndpoint*>& srcVector) { std::vector<TcrEndpoint*> tempDestVector; std::vector<TcrEndpoint*>::iterator itDest; std::vector<TcrEndpoint*>::const_iterator itSrc; itSrc = srcVector.begin(); while ((itDest = destVector.begin()) != destVector.end()) { if ((itSrc != srcVector.end()) && (*itDest == *itSrc)) { destVector.erase(itDest); ++itSrc; } else { tempDestVector.push_back(*itDest); destVector.erase(itDest); } } destVector = tempDestVector; // Postconditions: // 1. size of destVector decreases by the size of srcVector } void ThinClientRedundancyManager::addEndpointsInOrder( std::vector<TcrEndpoint*>& destVector, const std::vector<TcrEndpoint*>& srcVector) { destVector.insert(destVector.end(), srcVector.begin(), srcVector.end()); // Postconditions: // 1. Length of destVector increases by the length of srcVector } GfErrType ThinClientRedundancyManager::createQueueEP(TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply, bool isPrimary) { LOGFINE("Recovering subscriptions on endpoint [%s]", ep->name().c_str()); GfErrType err = GF_NOERR; if ((err = ep->registerDM(true, !isPrimary)) == GF_NOERR) { if ((err = m_theTcrConnManager->registerInterestAllRegions( ep, request, reply)) != GF_NOERR || !readyForEvents(ep)) { ep->unregisterDM(true); if (err == GF_NOERR) { err = GF_NOTCON; } } else { // recover CQs CacheImpl* cache = m_theTcrConnManager->getCacheImpl(); auto rqsService = std::dynamic_pointer_cast<RemoteQueryService>( cache->getQueryService(true)); if (rqsService != nullptr) { try { err = rqsService->executeAllCqs(ep); } catch (const Exception& excp) { LOGFINE("Failed to recover CQs on endpoint[%s]: %s", ep->name().c_str(), excp.what()); ep->unregisterDM(true); err = GF_NOTCON; } catch (...) { LOGFINE("Failed to recover CQs on endpoint[%s]", ep->name().c_str()); ep->unregisterDM(true); err = GF_NOTCON; } } } } LOGFINE("Done subscription recovery"); return err; } GfErrType ThinClientRedundancyManager::createPoolQueueEP( TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply, bool isPrimary) { LOGFINE("Recovering subscriptions on endpoint [" + ep->name() + "] from pool " + m_poolHADM->getName()); GfErrType err = GF_NOERR; if ((err = ep->registerDM(false, !isPrimary, false, m_poolHADM)) == GF_NOERR) { if ((err = m_poolHADM->registerInterestAllRegions(ep, request, reply)) != GF_NOERR || !readyForEvents(ep)) { ep->unregisterDM(false); if (err == GF_NOERR) { err = GF_NOTCON; } } else { // recover CQs auto rqsService = std::dynamic_pointer_cast<RemoteQueryService>( m_poolHADM->getQueryServiceWithoutCheck()); if (rqsService != nullptr) { try { err = rqsService->executeAllCqs(ep); } catch (const Exception& excp) { LOGFINE("Failed to recover CQs on endpoint[" + ep->name() + "]: %s" + excp.getMessage()); ep->unregisterDM(false); // Argument is useless err = GF_NOTCON; } catch (...) { LOGFINE("Failed to recover CQs on endpoint[" + ep->name() + "]"); ep->unregisterDM(false); // Argument is useless err = GF_NOTCON; } } } } LOGFINE("Done subscription recovery"); return err; } GfErrType ThinClientRedundancyManager::makePrimary(TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) { if (m_poolHADM) { return createPoolQueueEP(ep, request, reply, true); } else { return createQueueEP(ep, request, reply, true); } } GfErrType ThinClientRedundancyManager::makeSecondary(TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) { if (m_poolHADM) { return createPoolQueueEP(ep, request, reply, false); } else { return createQueueEP(ep, request, reply, false); } } void ThinClientRedundancyManager::initialize(int redundancyLevel) { LOGDEBUG( "ThinClientRedundancyManager::initialize(): initializing redundancy " "manager."); LOGFINE("Subscription redundancy level set to %d %s %s", redundancyLevel, m_poolHADM != nullptr ? "for pool" : "", m_poolHADM != nullptr ? m_poolHADM->getName().c_str() : ""); m_redundancyLevel = redundancyLevel; m_HAenabled = (redundancyLevel > 0 || m_theTcrConnManager->isDurable() || ThinClientBaseDM::isDeltaEnabledOnServer()); auto& sysProp = m_theTcrConnManager->getCacheImpl() ->getDistributedSystem() .getSystemProperties(); if (m_poolHADM) { m_eventidmap.init(m_poolHADM->getSubscriptionMessageTrackingTimeout()); } else { m_eventidmap.init(sysProp.notifyDupCheckLife()); } if (m_HAenabled) { std::chrono::milliseconds interval; if (m_poolHADM) { // Set periodic ack interval in seconds. interval = m_poolHADM->getSubscriptionAckInterval(); } else { interval = sysProp.notifyAckInterval(); } if (interval < std::chrono::milliseconds(100)) { interval = std::chrono::milliseconds(100); } next_ack_inc_ = interval; m_nextAck = clock::now() + interval; } if (m_poolHADM) { m_locators = m_poolHADM->getLocators(); if (m_locators->length() == 0) m_servers = m_poolHADM->getServers(); if (m_locators->length() > 0) { for (int item = 0; item < m_locators->length(); item++) { LOGDEBUG("ThinClientRedundancyManager::initialize: adding locator %s", (*m_locators)[item]->value().c_str()); } } else if (m_servers->length() > 0) { RandGen randgen; m_server = randgen(m_servers->length()); } else { throw IllegalStateException( "The redundancy manager's pool does not have locators or servers " "specified"); } } getAllEndpoints(m_nonredundantEndpoints); } void ThinClientRedundancyManager::sendNotificationCloseMsgs() { std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); for (auto&& endpoint : m_redundantEndpoints) { LOGDEBUG( "ThinClientRedundancyManager::sendNotificationCloseMsgs(): closing " "notification for endpoint %s", endpoint->name().c_str()); endpoint->stopNoBlock(); } for (auto&& endpoint : m_redundantEndpoints) { LOGDEBUG( "ThinClientRedundancyManager::sendNotificationCloseMsgs(): closing " "receiver for endpoint %s", endpoint->name().c_str()); endpoint->stopNotifyReceiverAndCleanup(); } } void ThinClientRedundancyManager::close() { LOGDEBUG("ThinClientRedundancyManager::close(): closing redundancy manager."); if (m_periodicAckTask) { auto& manager = m_theTcrConnManager->getCacheImpl()->getExpiryTaskManager(); manager.cancel(process_event_id_map_task_id_); m_periodicAckTask->stopNoblock(); periodic_ack_semaphore_.release(); m_periodicAckTask->wait(); m_periodicAckTask = nullptr; } std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); for (auto&& endpoint : m_redundantEndpoints) { LOGDEBUG( "ThinClientRedundancyManager::close(): unregistering from endpoint %s", endpoint->name().c_str()); endpoint->unregisterDM(true); } m_redundantEndpoints.clear(); m_nonredundantEndpoints.clear(); m_theTcrConnManager = nullptr; m_globalProcessedMarker = false; m_sentReadyForEvents = false; } bool ThinClientRedundancyManager::readyForEvents( TcrEndpoint* primaryCandidate) { if (!m_theTcrConnManager->isDurable() || !m_sentReadyForEvents) { return true; } TcrMessageClientReady request( new DataOutput(m_theTcrConnManager->getCacheImpl()->createDataOutput())); TcrMessageReply reply(true, nullptr); GfErrType err = GF_NOTCON; if (m_poolHADM) { err = m_poolHADM->sendRequestToEP(request, reply, primaryCandidate); } else { err = ThinClientBaseDM::sendRequestToEndPoint(request, reply, primaryCandidate); } if (err == GF_NOERR) { return true; } else { return false; } } void ThinClientRedundancyManager::moveEndpointToLast( std::vector<TcrEndpoint*>& epVector, TcrEndpoint* targetEp) { // Pre-condition // Remove Ep for (std::vector<TcrEndpoint*>::iterator it = epVector.begin(); it != epVector.end(); it++) { if (targetEp == *it) { epVector.erase(it); break; } } // Push it @ end. epVector.push_back(targetEp); } bool ThinClientRedundancyManager::sendMakePrimaryMesg(TcrEndpoint* ep, const TcrMessage*, ThinClientRegion*) { if (!ep->connected()) { return false; } TcrMessageReply reply(false, nullptr); const TcrMessageMakePrimary makePrimaryRequest( new DataOutput(m_theTcrConnManager->getCacheImpl()->createDataOutput()), ThinClientRedundancyManager::m_sentReadyForEvents); LOGFINE("Making primary subscription endpoint %s", ep->name().c_str()); GfErrType err = GF_NOTCON; if (m_poolHADM) { err = m_poolHADM->sendRequestToEP(makePrimaryRequest, reply, ep); } else { err = ThinClientBaseDM::sendRequestToEndPoint(makePrimaryRequest, reply, ep); } if (err == GF_NOERR) { /* this causes keys to be added to the region even if the reg interest * is supposed to fail due to notauthorized exception then causing * subsequent maintainredundancy calls to fail for other ops like CQ if ( request != nullptr && region != nullptr ) { const std::vector<std::shared_ptr<CacheableKey>> * keys = request->getKeys( ); bool isDurable = request->isDurable( ); if ( keys == nullptr || keys->empty( ) ) { const std::string& regex = request->getRegex( ); if ( !regex.empty( ) ) { region->addRegex( regex, isDurable ); } } else { region->addKeys( *keys, isDurable ); } } */ return true; } else { return false; } } GfErrType ThinClientRedundancyManager::sendSyncRequestCq( TcrMessage& request, TcrMessageReply& reply, ThinClientBaseDM* theHADM) { LOGDEBUG("ThinClientRedundancyManager::sendSyncRequestCq msgType[%d]", request.getMessageType()); std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); GfErrType err = GF_NOERR; GfErrType opErr; TcrEndpoint* primaryEndpoint = nullptr; if (m_redundantEndpoints.size() >= 1) { LOGDEBUG( "ThinClientRedundancyManager::sendSyncRequestCq: to secondary " "size[%zu]", m_redundantEndpoints.size()); std::vector<TcrEndpoint*>::iterator iter = m_redundantEndpoints.begin(); LOGDEBUG("endpoint[%s]", (*iter)->name().c_str()); for (++iter; iter != m_redundantEndpoints.end(); ++iter) { LOGDEBUG("endpoint[%s]", (*iter)->name().c_str()); LOGDEBUG( "msgType[%d] ThinClientRedundancyManager::sendSyncRequestCq: to " "secondary", request.getMessageType()); if (request.getMessageType() == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) { // Timeout for this message type is set like so... reply.setTimeout( dynamic_cast<ThinClientPoolDM*>(theHADM)->getReadTimeout()); opErr = theHADM->sendRequestToEP(request, reply, *iter); } else { opErr = theHADM->sendRequestToEP(request, reply, *iter); } if (err == GF_NOERR) { err = opErr; } } primaryEndpoint = m_redundantEndpoints[0]; LOGDEBUG("primary endpoint[%s]", primaryEndpoint->name().c_str()); } int32_t attempts = static_cast<int32_t>(m_redundantEndpoints.size()) + static_cast<int32_t>(m_nonredundantEndpoints.size()); attempts = attempts < MIN_RETRY_ATTEMPTS ? MIN_RETRY_ATTEMPTS : attempts; // at least 5 attempts if ep lists are small. AuthenticatedView* authenticatedView = nullptr; while (attempts--) { if (err != GF_NOERR || m_redundantEndpoints.empty()) { auto userAttr = UserAttributes::threadLocalUserAttributes; if (userAttr) { authenticatedView = userAttr->getAuthenticatedView(); } err = maintainRedundancyLevel(); // we continue on fatal error because MRL only tries a handshake without // sending a request (no params passed) so no need to check // isFatalClientError. if (theHADM->isFatalError(err) && m_redundantEndpoints.empty()) { continue; } } if (m_redundantEndpoints.empty()) { err = GF_NOTCON; } else { primaryEndpoint = m_redundantEndpoints[0]; LOGDEBUG( "ThinClientRedundancyManager::sendSyncRequestCq: to primary [%s]", primaryEndpoint->name().c_str()); GuardUserAttributes gua; if (authenticatedView != nullptr) { gua.setAuthenticatedView(authenticatedView); } err = theHADM->sendRequestToEP(request, reply, primaryEndpoint); if (err == GF_NOERR || err == GF_TIMEOUT || ThinClientBaseDM::isFatalClientError(err)) { break; } } } // top level should only get NotConnectedException if (err == GF_IOERR) { err = GF_NOTCON; } return err; } GfErrType ThinClientRedundancyManager::sendSyncRequestRegisterInterest( TcrMessage& request, TcrMessageReply& reply, bool, TcrEndpoint* endpoint, ThinClientBaseDM* theHADM, ThinClientRegion* region) { LOGDEBUG("ThinClientRedundancyManager::sendSyncRequestRegisterInterest "); if (!endpoint) { std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); auto err = GF_NOERR; auto opErr = GF_NOERR; TcrEndpoint* primaryEndpoint = nullptr; if (!m_redundantEndpoints.empty()) { primaryEndpoint = m_redundantEndpoints[0]; for (auto& redundantEndpoint : m_redundantEndpoints) { if (redundantEndpoint == primaryEndpoint) { continue; } redundantEndpoint->setDM(request.getDM()); opErr = theHADM->sendSyncRequestRegisterInterestEP( request, reply, false, redundantEndpoint); if (err == GF_NOERR) { err = opErr; } } } if ((request.getMessageType() == TcrMessage::REGISTER_INTEREST_LIST || request.getMessageType() == TcrMessage::REGISTER_INTEREST || request.getMessageType() == TcrMessage::UNREGISTER_INTEREST_LIST || request.getMessageType() == TcrMessage::UNREGISTER_INTEREST) && (err != GF_NOERR || static_cast<int>(m_redundantEndpoints.size()) <= m_redundancyLevel)) { err = maintainRedundancyLevel(false, &request, &reply, region); if (theHADM->isFatalClientError(err)) { return err; } } // We need to ensure that primary registration is done at the end. // So we skip the primary registration when maintainRedundancyLevel() // has already done that, i.e. primary endpoint not the same as that // before calling maintainRedundancyLevel() if (m_redundantEndpoints.empty()) { err = GF_NOTCON; } else if (primaryEndpoint && primaryEndpoint == m_redundantEndpoints[0]) { for (size_t count = 0; count < m_redundantEndpoints.size() + m_nonredundantEndpoints.size(); count++) { primaryEndpoint->setDM(request.getDM()); opErr = theHADM->sendSyncRequestRegisterInterestEP( request, reply, false, primaryEndpoint); if (opErr == GF_NOERR) { break; } else { err = maintainRedundancyLevel(false, &request, &reply, region); primaryEndpoint = m_redundantEndpoints[0]; } } if (theHADM->isFatalError(opErr)) { err = opErr; } } return err; } else { return theHADM->sendSyncRequestRegisterInterestEP(request, reply, false, endpoint); } } synchronized_map<std::unordered_map<std::string, std::shared_ptr<TcrEndpoint>>, std::recursive_mutex>& ThinClientRedundancyManager::updateAndSelectEndpoints() { // 38196 Fix: For durable clients reconnect // 1. Get list of endpoints which have HA queue. // 2. Get HA endpoint with max queuesize; // 3. Move other HA endpoints at start of list in ascending order of // queuesize. // 4. Add HA endpoint with Max queueSize at the end. // Primary : Secondaries in ascending order of QSiz : Nonredundant : Secondary // with QSizemax // e.g. R[#70]:Old Primary, R [#80] , R[#90], NR.... , R[#100]:New Primary // Exception: For R =0 ( or when no EP with Max queuesize ), // Old primary would be considered as new. Hence it would be at the end if (m_poolHADM) { // fetch queue servers // send queue servers for sorting std::set<ServerLocation> exclEndPts; std::list<ServerLocation> outEndpoints; outEndpoints = selectServers(-1, exclEndPts); for (auto& it : outEndpoints) { m_poolHADM->addEP(it); } return m_poolHADM->m_endpoints; } else { return m_theTcrConnManager->m_endpoints; } } void ThinClientRedundancyManager::getAllEndpoints( std::vector<TcrEndpoint*>& endpoints) { std::shared_ptr<TcrEndpoint> maxQEp, primaryEp; auto& selectedEndpoints = updateAndSelectEndpoints(); for (const auto& currItr : selectedEndpoints) { if (isDurable()) { auto ep = currItr.second; int32_t queueSize = 0; TcrConnection* statusConn = nullptr; auto status = ep->getFreshServerQueueStatus(queueSize, !m_poolHADM, statusConn); if (m_poolHADM && status != NON_REDUNDANT_SERVER) { m_poolHADM->addConnection(statusConn); } if (status == REDUNDANT_SERVER) { if (!maxQEp) { maxQEp = ep; } else if (ep->getServerQueueSize() > maxQEp->getServerQueueSize()) { insertEPInQueueSizeOrder(maxQEp.get(), endpoints); maxQEp = ep; } else { insertEPInQueueSizeOrder(ep.get(), endpoints); } LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting " "endpoints, found redundant endpoint."); } else if (status == PRIMARY_SERVER) { // Primary should be unique primaryEp = ep; LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting " "endpoints, found primary endpoint."); } else { endpoints.push_back(currItr.second.get()); LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting " "endpoints, found nonredundant endpoint."); } } else { endpoints.push_back(currItr.second.get()); } //(*currItr)++; } // Add Endpoint with Max Queuesize at the last and Primary at first position if (isDurable()) { if (maxQEp) { endpoints.push_back(maxQEp.get()); LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting endpoints, " "pushing max-q endpoint at back."); } if (primaryEp) { if (m_redundancyLevel == 0 || !maxQEp) { endpoints.push_back(primaryEp.get()); LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting " "endpoints, pushing primary at back."); } else { endpoints.insert(endpoints.begin(), primaryEp.get()); LOGDEBUG( "ThinClientRedundancyManager::getAllEndpoints(): sorting " "endpoints, inserting primary at head."); } } } else { RandGen randgen; std::shuffle(endpoints.begin(), endpoints.end(), randgen); } } void ThinClientRedundancyManager::insertEPInQueueSizeOrder( TcrEndpoint* inputEp, std::vector<TcrEndpoint*>& endpoints) { // need to sort out redundant ep in ascending order. other eps will be at // back. std::vector<TcrEndpoint*>::iterator it = endpoints.begin(); while (it != endpoints.end()) { TcrEndpoint* thisEp = *it; if ((thisEp->getServerQueueSize() >= inputEp->getServerQueueSize()) || (thisEp->getServerQueueStatus() != REDUNDANT_SERVER)) { break; } it++; } endpoints.insert(it, inputEp); } bool ThinClientRedundancyManager::isDurable() { return m_theTcrConnManager->isDurable(); } void ThinClientRedundancyManager::readyForEvents() { TcrMessageClientReady request( new DataOutput(m_theTcrConnManager->getCacheImpl()->createDataOutput())); TcrMessageReply reply(true, nullptr); GfErrType result = GF_NOTCON; unsigned int epCount = 0; std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); if (m_sentReadyForEvents) { throw IllegalStateException("Already called readyForEvents"); } TcrEndpoint* primary = nullptr; if (m_redundantEndpoints.size() > 0) { primary = m_redundantEndpoints[0]; if (m_poolHADM) { result = m_poolHADM->sendRequestToEP(request, reply, primary); } else { result = primary->send(request, reply); } } epCount++; while (result != GF_NOERR && epCount <= m_redundantEndpoints.size() + m_nonredundantEndpoints.size()) { TcrMessageReply tcrMessageReply(true, nullptr); maintainRedundancyLevel(); if (m_redundantEndpoints.size() > 0) { primary = m_redundantEndpoints[0]; if (m_poolHADM) { result = m_poolHADM->sendRequestToEP(request, tcrMessageReply, primary); } else { result = primary->send(request, tcrMessageReply); } } epCount++; } if (result != GF_NOERR) { throw NotConnectedException("No endpoints available"); } m_sentReadyForEvents = true; } void ThinClientRedundancyManager::periodicAck(std::atomic<bool>& isRunning) { periodic_ack_semaphore_.acquire(); while (isRunning) { doPeriodicAck(); periodic_ack_semaphore_.acquire(); } } void ThinClientRedundancyManager::doPeriodicAck() { LOGDEBUG( "ThinClientRedundancyManager::processEventIdMap( ): Examining eventid " "map."); // do periodic ack if HA is enabled and the time has come if (m_HAenabled && (m_nextAck < clock::now())) { LOGFINER("Doing periodic ack"); m_nextAck += next_ack_inc_; auto entries = m_eventidmap.getUnAcked(); auto count = entries.size(); if (count > 0) { bool acked = false; std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); auto endpoint = m_redundantEndpoints.begin(); if (endpoint != m_redundantEndpoints.end()) { TcrMessagePeriodicAck request( new DataOutput( m_theTcrConnManager->getCacheImpl()->createDataOutput()), entries); TcrMessageReply reply(true, nullptr); GfErrType result = GF_NOERR; if (m_poolHADM) { result = m_poolHADM->sendRequestToEP(request, reply, *endpoint); } else { result = (*endpoint)->send(request, reply); } if (result == GF_NOERR && reply.getMessageType() == TcrMessage::REPLY) { LOGFINE( "Sent subscription ack message for %zu sources to endpoint %s", count, (*endpoint)->name().c_str()); acked = true; } else { LOGWARN( "Failure sending subscription ack message for %zu sources to " "endpoint %s", count, (*endpoint)->name().c_str()); LOGFINER("Ack result is %d and reply message type is %d", result, reply.getMessageType()); } } else { LOGWARN( "No subscription servers available for periodic ack for %zu " "sources", count); } if (!acked) { // clear entries' acked flag for next periodic ack m_eventidmap.clearAckedFlags(entries); } } } // check the event id map for expiry uint32_t expired = m_eventidmap.expire(m_HAenabled /* onlyacked */); if (expired > 0) { LOGFINE("Expired %d sources from subscription map", expired); } } void ThinClientRedundancyManager::startPeriodicAck() { m_periodicAckTask = std::unique_ptr<Task<ThinClientRedundancyManager>>( new Task<ThinClientRedundancyManager>( this, &ThinClientRedundancyManager::periodicAck, NC_PerodicACK)); m_periodicAckTask->start(); const auto& props = m_theTcrConnManager->getCacheImpl() ->getDistributedSystem() .getSystemProperties(); // start the periodic ACK task handler auto& manager = m_theTcrConnManager->getCacheImpl()->getExpiryTaskManager(); auto task = std::make_shared<FunctionExpiryTask>( manager, [this] { periodic_ack_semaphore_.release(); }); process_event_id_map_task_id_ = manager.schedule(std::move(task), next_ack_inc_, next_ack_inc_); LOGFINE( "Registered subscription event " "periodic ack task with id = %ld, notify-ack-interval = %ld, " "notify-dupcheck-life = %ld, periodic ack is %sabled", process_event_id_map_task_id_, (m_poolHADM ? m_poolHADM->getSubscriptionAckInterval() : props.notifyAckInterval()) .count(), (m_poolHADM ? m_poolHADM->getSubscriptionMessageTrackingTimeout() : props.notifyDupCheckLife()) .count(), m_HAenabled ? "en" : "dis"); } // notification dup check with the help of eventidmap - called by // ThinClientRegion bool ThinClientRedundancyManager::checkDupAndAdd( std::shared_ptr<EventId> eventid) { EventIdMapEntry entry = EventIdMap::make(eventid); return m_eventidmap.put(entry.first, entry.second, true); } void ThinClientRedundancyManager::netDown() { std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); if (!m_poolHADM) { m_nonredundantEndpoints.insert(m_nonredundantEndpoints.end(), m_redundantEndpoints.begin(), m_redundantEndpoints.end()); } m_redundantEndpoints.clear(); } void ThinClientRedundancyManager::removeCallbackConnection(TcrEndpoint* ep) { std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); ep->unregisterDM(false, nullptr, true); } GfErrType ThinClientRedundancyManager::sendRequestToPrimary( TcrMessage& request, TcrMessageReply& reply) { std::lock_guard<decltype(m_redundantEndpointsLock)> guard( m_redundantEndpointsLock); GfErrType err = GF_NOTCON; for (size_t count = 0; count <= m_redundantEndpoints.size() + m_nonredundantEndpoints.size(); count++) { if (m_poolHADM) { err = m_poolHADM->sendRequestToEP(request, reply, m_redundantEndpoints[0]); } else { err = m_redundantEndpoints[0]->send(request, reply); } if (err == GF_NOERR) break; maintainRedundancyLevel(); } return err; } } // namespace client } // namespace geode } // namespace apache