GfErrType ThinClientRedundancyManager::maintainRedundancyLevel()

in cppcache/src/ThinClientRedundancyManager.cpp [107:468]


GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
    bool init, const TcrMessage* request, TcrMessageReply* reply,
    ThinClientRegion* region) {
  // Preconditions:
  // 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints
  // 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty
  GfErrType err = GF_NOTCON;
  // save any fatal errors that occur during maintain redundancy so
  // that we can send it back to the caller, to avoid missing out due
  // to nonfatal errors such as server not available
  GfErrType fatalError = GF_NOERR;
  bool fatal = false;
  std::lock_guard<decltype(m_redundantEndpointsLock)> guard(
      m_redundantEndpointsLock);
  bool isRedundancySatisfied = false;
  int secondaryCount = 0;
  bool isPrimaryConnected = false;
  bool isPrimaryAtBack = false;
  // TODO: isPrimaryAtBack can be removed by simplifying
  // removeEndpointsInOrder().

  std::vector<TcrEndpoint*>::iterator itRedundant =
      m_redundantEndpoints.begin();
  std::vector<TcrEndpoint*> tempRedundantEndpoints;
  std::vector<TcrEndpoint*> tempNonredundantEndpoints;

  // Redundancy level is maintained as follows using the following vectors:
  // m_redundantEndpoints, m_nonredundantEndpoints, tempRedundantEndpoints,
  // tempNonredundantEndpoints.
  // m_redundantEndpoints and m_nonredundantEndpoints contain the current
  // primary-secondary and non-redundant endpoints respectively.
  // tempRedundantEndpoints and tempNonredundantEndpoints are vectors that are
  // used to adjust the status of endpoints:
  // tempRedundantEndpoints: contains endpoints which have been changed from
  // nonredundant to either primary or secondary, i.e.
  // those endpoints whose status has been changed in order to satisfy
  // redundancy level.
  // tempNonredundantEndpoints: contains endpoints which were earlier redundant
  // but are now not connected. These endpoints will
  // be moved from the redundant list to the non redundant list.

  // Step 1: Scan all the endpoints in one pass and arrange them according to
  // connection status in the following order:
  // m_redundantEndpoints,tempRedundantEndpoints,m_nonredundantEndpoints,tempNonredundantEndpoints
  // The endpoints are maintained in order in the redundant endpoints lists.
  // This order is maintained when the
  // endpoints in the temporary list are moved into
  // m_redundantEndpoints/m_nonredundantEndpoints.
  // Note that although endpoint status may change, the endpoints are not
  // deleted from the lists (instead, the endpoints are
  // copied to temporary lists).
  // Scanning the endpoints is done in two stages:
  // 1. First scan the redundant endpoints to find the failed endpoints list,
  // whether redundancy has been satisfied.
  // 2. If redundancy has not been satisfied, scan the nonredundant list to find
  // available endpoints that can be made redundant.

  LOGDEBUG(
      "ThinClientRedundancyManager::maintainRedundancyLevel(): checking "
      "redundant list, size = %zu",
      m_redundantEndpoints.size());
  while (!isRedundancySatisfied && itRedundant != m_redundantEndpoints.end()) {
    if (!isPrimaryConnected) {
      if (itRedundant == m_redundantEndpoints.begin()) {
        if ((*itRedundant)->connected()) {
          isPrimaryConnected = true;
          if (m_redundancyLevel == 0) isRedundancySatisfied = true;
        } else {
          tempNonredundantEndpoints.push_back(*itRedundant);
        }
      } else {
        if (sendMakePrimaryMesg(*itRedundant, request, region)) {
          isPrimaryConnected = true;
        } else {
          tempNonredundantEndpoints.push_back(*itRedundant);
        }
      }
    } else {
      if ((*itRedundant)->connected()) {
        secondaryCount++;
        if (secondaryCount == m_redundancyLevel) {
          isRedundancySatisfied = true;
        }
      } else {
        tempNonredundantEndpoints.push_back(*itRedundant);
      }
    }
    ++itRedundant;
  }

  // If redundancy is not satisfied, find nonredundant endpoints that can be
  // made redundant.
  // For queue locators, fetch an initial list of endpoints which can host
  // queues.
  if (!isRedundancySatisfied && m_poolHADM && !init) {
    LOGDEBUG(
        "ThinClientRedundancyManager::maintainRedundancyLevel(): building "
        "nonredundant list via pool.");

    std::list<ServerLocation> outEndpoints;
    std::set<ServerLocation> exclEndPts;
    for (std::vector<TcrEndpoint*>::iterator itr = m_redundantEndpoints.begin();
         itr != m_redundantEndpoints.end(); itr++) {
      LOGDEBUG(
          "ThinClientRedundancyManager::maintainRedundancyLevel(): excluding "
          "endpoint %s from queue list.",
          (*itr)->name().c_str());
      ServerLocation serverLoc((*itr)->name());
      exclEndPts.insert(serverLoc);
    }

    m_nonredundantEndpoints.clear();
    int howMany = -1;
    if (m_locators != nullptr && m_locators->length() > 0 &&
        m_servers != nullptr && m_servers->length() == 0) {
      // if we are using locators only request the required number of servers.
      howMany = m_redundancyLevel - static_cast<int>(exclEndPts.size()) + 1;
    }
    outEndpoints = selectServers(howMany, exclEndPts);
    for (std::list<ServerLocation>::iterator it = outEndpoints.begin();
         it != outEndpoints.end(); it++) {
      auto ep = m_poolHADM->addEP(*it);
      LOGDEBUG(
          "ThinClientRedundancyManager::maintainRedundancyLevel(): Adding "
          "endpoint %s to nonredundant list.",
          ep->name().c_str());
      m_nonredundantEndpoints.push_back(ep.get());
    }
  }

  LOGDEBUG(
      "ThinClientRedundancyManager::maintainRedundancyLevel(): finding "
      "nonredundant endpoints, size = %zu",
      m_nonredundantEndpoints.size());
  std::vector<TcrEndpoint*>::iterator itNonredundant =
      m_nonredundantEndpoints.begin();
  while (!isRedundancySatisfied &&
         itNonredundant != m_nonredundantEndpoints.end()) {
    if (!isPrimaryConnected) {
      if (secondaryCount == m_redundancyLevel) {
        // 38196:Make last endpoint from the non redundant list as primary.
        if ((!init || *itNonredundant == m_nonredundantEndpoints.back()) &&
            (err = makePrimary(*itNonredundant, request, reply)) == GF_NOERR) {
          tempRedundantEndpoints.push_back(*itNonredundant);
          isRedundancySatisfied = true;
          isPrimaryConnected = true;
          isPrimaryAtBack = true;
        } else {
          if (ThinClientBaseDM::isFatalError(err)) {
            fatal = true;
            fatalError = err;
          }
        }
      } else {
        if ((err = makeSecondary(*itNonredundant, request, reply)) ==
            GF_NOERR) {
          tempRedundantEndpoints.push_back(*itNonredundant);
          secondaryCount++;
        } else {
          if (ThinClientBaseDM::isFatalError(err)) {
            fatal = true;
            fatalError = err;
          }
        }
      }
    } else {
      if ((err = makeSecondary(*itNonredundant, request, reply)) == GF_NOERR) {
        tempRedundantEndpoints.push_back(*itNonredundant);
        secondaryCount++;
        if (secondaryCount == m_redundancyLevel) {
          isRedundancySatisfied = true;
        }
      } else {
        if (ThinClientBaseDM::isFatalError(err)) {
          fatal = true;
          fatalError = err;
        }
      }
    }
    ++itNonredundant;
  }
  // Step 2: After one scan of the endpoints, if the redundancy level is
  // satisifed by changing status of endpoints, the primary
  // endpoint will be present in either m_redundantEndpoints or
  // tempRedundantEndpoints.
  // However, when redundancy level is not satisifed and following condition
  // holds true, a new secondary server (whose status
  // changed in Step 1) will be made primary:
  // A. No primary server was found, and
  // B. secondaryCount <= redundancy level.

  // 38196: Prefer Primary conversion from oldHA .
  TcrEndpoint* convertedPrimary = nullptr;
  if (init && !isRedundancySatisfied && !isPrimaryConnected) {
    bool oldHAEndPointPresent = false;
    for (std::vector<TcrEndpoint*>::iterator it =
             tempRedundantEndpoints.begin();
         it != tempRedundantEndpoints.end(); it++) {
      if ((*it)->getServerQueueStatus() != NON_REDUNDANT_SERVER) {
        oldHAEndPointPresent = true;
        break;
      }
    }
    // TODO: Post-38196fix, simplify durable client initialization by removing
    // constraint on primary position.

    //  holds the endpoints that are skipped by the oldHAEndPointPresent
    // check in the loop back
    std::vector<TcrEndpoint*> tempSkippedEndpoints;
    // warning: do not use unsigned type for index since .size() can return 0
    while (!tempRedundantEndpoints.empty()) {
      TcrEndpoint* ep = tempRedundantEndpoints.back();
      if (oldHAEndPointPresent &&
          ep->getServerQueueStatus() == NON_REDUNDANT_SERVER) {
        tempSkippedEndpoints.push_back(ep);
        tempRedundantEndpoints.pop_back();
        continue;
      }
      if (sendMakePrimaryMesg(ep, request, region)) {
        // Primary may be in middle If there are older nonredundant
        // ep in tempRedundantEndpoints
        isPrimaryAtBack = false;
        convertedPrimary = ep;
        isPrimaryConnected = true;
        break;
      } else {
        tempRedundantEndpoints.pop_back();
      }
    }
    //  push back the skipped endpoints into tempRedundantEndpoints
    while (!tempSkippedEndpoints.empty()) {
      TcrEndpoint* ep = tempSkippedEndpoints.back();
      tempSkippedEndpoints.pop_back();
      tempRedundantEndpoints.push_back(ep);
    }
  }
  if (!isRedundancySatisfied && !isPrimaryConnected) {
    // warning: do not use unsigned type for index since .size() can return 0
    while (!tempRedundantEndpoints.empty()) {
      TcrEndpoint* ep = tempRedundantEndpoints.back();
      if (sendMakePrimaryMesg(ep, request, region)) {
        isPrimaryAtBack = true;
        isPrimaryConnected = true;
        break;
      } else {
        tempRedundantEndpoints.pop_back();
      }
    }
  }

  // Step 3: Finally, create the new redundant and nonredundant lists. Copy from
  // m_redundantEndpointsList all the endpoints that were
  // marked as disconnected. Add in order all the new redundant endpoints (whose
  // status changed in Step 2) to m_redundantEndpoints.
  // If primary was at end of temporary list, move it to front on redundant
  // list.
  // Similarly, adjust the nonredundant list.

  removeEndpointsInOrder(m_redundantEndpoints, tempNonredundantEndpoints);
  removeEndpointsInOrder(m_nonredundantEndpoints, tempRedundantEndpoints);

  // 38196:for DurableReconnect case, primary may be in between, put it @ start.
  if (init && !isPrimaryAtBack && convertedPrimary != nullptr) {
    moveEndpointToLast(tempRedundantEndpoints, convertedPrimary);
    isPrimaryAtBack = true;
  }

  addEndpointsInOrder(m_redundantEndpoints, tempRedundantEndpoints);

  if (isPrimaryConnected && isPrimaryAtBack) {
    TcrEndpoint* primary = m_redundantEndpoints.back();
    m_redundantEndpoints.pop_back();
    m_redundantEndpoints.insert(m_redundantEndpoints.begin(), primary);
  }

  // Unregister DM for the new non-redundant endpoints
  for (std::vector<TcrEndpoint*>::const_iterator iter =
           tempNonredundantEndpoints.begin();
       iter != tempNonredundantEndpoints.end(); ++iter) {
    (*iter)->unregisterDM(true);
  }
  addEndpointsInOrder(m_nonredundantEndpoints, tempNonredundantEndpoints);

  // Postconditions:
  // 1. If redundancy level is satisifed, m_redundantEndpoints.size = r + 1,
  // m_redundantEndpoints[0] is primary.
  // 2. If redundancy level is not satisifed, m_redundantEndpoints.size <= r.
  // 3. If primary is connected, m_redundantEndpoints[0] is primary. ( Not
  // checked. To verify, We may have to modify
  //    TcrEndpoint class.)
  std::shared_ptr<RemoteQueryService> queryServicePtr;
  ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_poolHADM);
  if (poolDM) {
    queryServicePtr = std::dynamic_pointer_cast<RemoteQueryService>(
        poolDM->getQueryServiceWithoutCheck());
  }
  if (queryServicePtr != nullptr) {
    if (isPrimaryConnected) {
      // call CqStatusListener connect
      LOGDEBUG(
          "invoke invokeCqConnectedListeners for connected for CQ status "
          "listener");
      queryServicePtr->invokeCqConnectedListeners(poolDM, true);
    } else {
      // call CqStatusListener disconnect
      LOGDEBUG(
          "invoke invokeCqDisConnectedListeners for disconnected for CQ status "
          "listener");
      queryServicePtr->invokeCqConnectedListeners(poolDM, false);
    }
  }

  // Invariants:
  // 1. m_redundantEndpoints UNION m_nonredundantEndpionts = All Endpoints
  // 2. m_redundantEndpoints INTERSECTION m_nonredundantEndpoints = Empty

  // The global endpoint list does not change ever for HA so getAllEndpoints
  // result or redundantEndpoints/nonredundantEndpoints cannot have stale or
  // deleted endpoints

  if (m_poolHADM) {
    m_poolHADM->getStats().setSubsServers(
        static_cast<int32_t>(m_redundantEndpoints.size()));
  }

  if (isRedundancySatisfied) {
    m_allEndpointsDisconnected = false;
    m_loggedRedundancyWarning = false;
    return GF_NOERR;
  } else if (isPrimaryConnected) {
    if (fatal && err != GF_NOERR) {
      return fatalError;
    }
    m_allEndpointsDisconnected = false;
    if (m_redundancyLevel == -1) {
      LOGINFO("Current subscription redundancy level is %zu",
              m_redundantEndpoints.size() - 1);
      return GF_NOERR;
    }
    if (!m_loggedRedundancyWarning) {
      LOGWARN(
          "Requested subscription redundancy level %d is not satisfiable with "
          "%zu servers available",
          m_redundancyLevel, m_redundantEndpoints.size());
      m_loggedRedundancyWarning = true;
    }
    return GF_NOERR;
  } else {
    // save any fatal errors that occur during maintain redundancy so
    // that we can send it back to the caller, to avoid missing out due
    // to nonfatal errors such as server not available
    if (m_poolHADM && !m_allEndpointsDisconnected) {
      m_poolHADM->clearKeysOfInterestAllRegions();
      m_poolHADM->sendNotConnectedMessageToAllregions();
      m_allEndpointsDisconnected = true;
    }
    if (fatal && err != GF_NOERR) {
      return fatalError;
    }
    return err;
  }
}