private void reconnect()

in geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java [2435:2751]


  private void reconnect(boolean forcedDisconnect, String reason) {

    // Collect all the state for cache
    // Collect all the state for Regions
    // Close the cache,
    // loop trying to connect, waiting before each attempt
    //
    // If reconnecting for lost-roles the reconnected system's cache will decide
    // whether the reconnected system should stay up. After max-tries we will
    // give up.
    //
    // If reconnecting for forced-disconnect we ignore max-tries and keep attempting
    // to join the distributed system until successful

    attemptingToReconnect = true;
    InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
    if (ids == null) {
      ids = this;
    }

    // first save the current cache description. This is created by
    // the membership manager when forced-disconnect starts. If we're
    // reconnecting for lost roles then this will be null
    String cacheXML = null;
    List<CacheServerCreation> cacheServerCreation = null;

    InternalCache cache = GemFireCacheImpl.getInstance();
    if (cache != null) {
      cacheXML = cache.getCacheConfig().getCacheXMLDescription();
      cacheServerCreation = cache.getCacheConfig().getCacheServerCreation();
    }

    DistributionConfig oldConfig = ids.getConfig();
    Properties configProps = config.toProperties();
    configProps.putAll(config.toSecurityProperties());

    int timeOut = oldConfig.getMaxWaitTimeForReconnect();
    int memberTimeout = oldConfig.getMemberTimeout();
    // we need to make sure that a surviving member is able
    // to take over coordination before trying to auto-reconnect.
    // failure detection can take 4 member-timeout intervals
    // so we set that as a minimum. (suspect, check suspect, final check, send new view)
    final int intervalsAllowedForFailureDetection = 4;
    timeOut = Math.max(timeOut, memberTimeout * intervalsAllowedForFailureDetection);

    int maxTries = oldConfig.getMaxNumReconnectTries();

    final boolean isDebugEnabled = logger.isDebugEnabled();

    if (Thread.currentThread().getName().equals("DisconnectThread")) {
      if (isDebugEnabled) {
        logger.debug("changing thread name to ReconnectThread");
      }
      Thread.currentThread().setName("ReconnectThread");
    }

    // get the membership manager for quorum checks
    Distribution mbrMgr = dm.getDistribution();
    quorumChecker = mbrMgr.getQuorumChecker();
    if (logger.isDebugEnabled()) {
      if (quorumChecker == null) {
        logger.debug("No quorum checks will be performed during reconnect attempts");
      } else {
        logger.debug("Initialized quorum checking service: {}", quorumChecker);
      }
    }

    // LOG:CLEANUP: deal with reconnect and INHIBIT_DM_BANNER -- this should be ok now
    String appendToLogFile = System.getProperty(APPEND_TO_LOG_FILE);
    if (appendToLogFile == null) {
      System.setProperty(APPEND_TO_LOG_FILE, "true");
    }
    String inhibitBanner = System.getProperty(InternalLocator.INHIBIT_DM_BANNER);
    if (inhibitBanner == null) {
      System.setProperty(InternalLocator.INHIBIT_DM_BANNER, "true");
    }
    if (forcedDisconnect) {
      systemAttemptingReconnect = this;
    }
    try {
      while (reconnectDS == null || !reconnectDS.isConnected()) {
        if (isReconnectCancelled()) {
          break;
        }

        if (!forcedDisconnect) {
          if (isDebugEnabled) {
            logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut);
          }
          if (reconnectAttemptCounter.get() >= maxTries) {
            if (isDebugEnabled) {
              logger.debug(
                  "Stopping the checkrequiredrole thread because reconnect : {} reached the max "
                      + "number of reconnect tries : {}",
                  reconnectAttemptCounter, maxTries);
            }
            InternalCache internalCache = dm.getCache();
            if (internalCache == null) {
              throw new CacheClosedException(
                  "Some required roles missing");
            } else {
              throw internalCache.getCacheClosedException(
                  "Some required roles missing");
            }
          }
        }

        reconnectAttemptCounter.getAndIncrement();

        if (isReconnectCancelled()) {
          return;
        }

        logger.info("Disconnecting old DistributedSystem to prepare for a reconnect attempt");

        try {
          disconnect(true, reason, false);
        } catch (Exception ee) {
          logger.warn("Exception disconnecting for reconnect", ee);
        }

        TypeRegistry.init();

        try {
          reconnectLock.wait(timeOut);
        } catch (InterruptedException e) {
          logger.warn("Waiting thread for reconnect got interrupted.");
          Thread.currentThread().interrupt();
          return;
        }

        if (isReconnectCancelled()) {
          return;
        }

        logger.info(
            "Attempting to reconnect to the distributed system.  This is attempt #{}.",
            reconnectAttemptCounter);

        int saveNumberOfTries = reconnectAttemptCounter.get();
        try {
          // notify listeners of each attempt and then again after successful
          notifyReconnectListeners(this, reconnectDS, true);

          if (locatorDMTypeForced) {
            System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
          }

          configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE);
          if (quorumChecker != null) {
            configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker);
          }

          InternalDistributedSystem newDS = null;
          if (isReconnectCancelled()) {
            return;
          }

          try {

            newDS = connectInternal(configProps, null, metricsService.getRebuilder(),
                membershipLocator);

          } catch (CancelException e) {
            if (isReconnectCancelled()) {
              return;
            } else {
              throw e;
            }
          } finally {
            if (newDS == null && quorumChecker != null) {
              // make sure the quorum checker is listening for messages from former members
              quorumChecker.resume();
            }
          }

          if (reconnectCancelled) {
            newDS.disconnect();
            continue;
          }

          reconnectDS = newDS;
        } catch (SystemConnectException e) {
          logger.debug("Attempt to reconnect failed with SystemConnectException");

          if (e.getMessage().contains("Rejecting the attempt of a member using an older version")) {
            logger.warn("Exception occurred while trying to connect the system during reconnect",
                e);
            attemptingToReconnect = false;
            reconnectException = e;
            return;
          }
          logger.warn("Caught SystemConnectException in reconnect", e);
          continue;
        } catch (GemFireConfigException e) {
          logger.warn("Caught GemFireConfigException in reconnect", e);
          continue;
        } catch (Exception e) {
          logger.warn("Exception occurred while trying to connect the system during reconnect",
              e);
          attemptingToReconnect = false;
          reconnectException = e;
          return;
        } finally {
          if (locatorDMTypeForced) {
            System.getProperties().remove(InternalLocator.FORCE_LOCATOR_DM_TYPE);
          }
          reconnectAttemptCounter.set(saveNumberOfTries);
        }

        DistributionManager newDM = reconnectDS.getDistributionManager();
        if (newDM instanceof ClusterDistributionManager) {
          // Admin systems don't carry a cache, but for others we can now create
          // a cache
          if (newDM.getDMType() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) {
            boolean retry;
            do {
              retry = false;
              try {
                cache = new InternalCacheBuilder()
                    .setCacheXMLDescription(cacheXML)
                    .create(reconnectDS);

                if (!cache.isClosed()) {
                  createAndStartCacheServers(cacheServerCreation, cache);
                  if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
                    reconnectAttemptCounter.set(0);
                  }
                }

              } catch (GemFireConfigException e) {
                if (e.getCause() instanceof ClusterConfigurationNotAvailableException) {
                  retry = true;
                  logger.info("Reconnected to the cluster but the cluster configuration service "
                      + "isn't available - will retry creating the cache");
                  try {
                    Thread.sleep(5000);
                  } catch (InterruptedException e1) {
                    reconnectCancelled = true;
                    reconnectException = e;
                    break;
                  }
                }
              } catch (Exception e) {
                // We need to give up because we'll probably get the same exception in
                // the next attempt to build the cache.
                logger.warn(
                    "Exception occurred while trying to create the cache during reconnect.  "
                        + "Auto-reconnect is terminating.",
                    e);
                reconnectCancelled = true;
                reconnectException = e;
                break;
              }
            } while (retry);
          }
        }

        if (reconnectDS != null && reconnectDS.isConnected()) {
          // make sure the new DS and cache are stable before exiting this loop
          try {
            Thread.sleep(config.getMemberTimeout() * 3L);
          } catch (InterruptedException e) {
            logger.info("Reconnect thread has been interrupted - exiting");
            Thread.currentThread().interrupt();
            reconnectCancelled = true;
            reconnectException = e;
            return;
          }
        }
      } // while()

      if (isReconnectCancelled()) {
        if (reconnectDS != null) {
          reconnectDS.disconnect();
        }
      } else {
        reconnectDS.isReconnectingDS = false;
        if (reconnectDS.isConnected()) {
          notifyReconnectListeners(this, reconnectDS, false);
        }
      }

    } finally {
      systemAttemptingReconnect = null;
      attemptingToReconnect = false;
      if (appendToLogFile == null) {
        System.getProperties().remove(APPEND_TO_LOG_FILE);
      } else {
        System.setProperty(APPEND_TO_LOG_FILE, appendToLogFile);
      }
      if (inhibitBanner == null) {
        System.getProperties().remove(InternalLocator.INHIBIT_DM_BANNER);
      } else {
        System.setProperty(InternalLocator.INHIBIT_DM_BANNER, inhibitBanner);
      }
      dm.getDistribution().setReconnectCompleted(true);
      InternalDistributedSystem newds = reconnectDS;
      if (newds != null) {
        newds.getDM().getDistribution().setReconnectCompleted(true);
      }
      if (quorumChecker != null && (reconnectDS == null || !reconnectDS.isConnected())) {
        quorumChecker.close();
      }
    }

    if (isReconnectCancelled()) {
      logger.debug("reconnect can no longer be done because of an explicit disconnect");
      if (reconnectDS != null) {
        reconnectDS.disconnect();
      }
      attemptingToReconnect = false;
    } else if (reconnectDS != null && reconnectDS.isConnected()) {
      logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
          cache);
    }
  }