in geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java [2435:2751]
private void reconnect(boolean forcedDisconnect, String reason) {
// Collect all the state for cache
// Collect all the state for Regions
// Close the cache,
// loop trying to connect, waiting before each attempt
//
// If reconnecting for lost-roles the reconnected system's cache will decide
// whether the reconnected system should stay up. After max-tries we will
// give up.
//
// If reconnecting for forced-disconnect we ignore max-tries and keep attempting
// to join the distributed system until successful
attemptingToReconnect = true;
InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
if (ids == null) {
ids = this;
}
// first save the current cache description. This is created by
// the membership manager when forced-disconnect starts. If we're
// reconnecting for lost roles then this will be null
String cacheXML = null;
List<CacheServerCreation> cacheServerCreation = null;
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
cacheXML = cache.getCacheConfig().getCacheXMLDescription();
cacheServerCreation = cache.getCacheConfig().getCacheServerCreation();
}
DistributionConfig oldConfig = ids.getConfig();
Properties configProps = config.toProperties();
configProps.putAll(config.toSecurityProperties());
int timeOut = oldConfig.getMaxWaitTimeForReconnect();
int memberTimeout = oldConfig.getMemberTimeout();
// we need to make sure that a surviving member is able
// to take over coordination before trying to auto-reconnect.
// failure detection can take 4 member-timeout intervals
// so we set that as a minimum. (suspect, check suspect, final check, send new view)
final int intervalsAllowedForFailureDetection = 4;
timeOut = Math.max(timeOut, memberTimeout * intervalsAllowedForFailureDetection);
int maxTries = oldConfig.getMaxNumReconnectTries();
final boolean isDebugEnabled = logger.isDebugEnabled();
if (Thread.currentThread().getName().equals("DisconnectThread")) {
if (isDebugEnabled) {
logger.debug("changing thread name to ReconnectThread");
}
Thread.currentThread().setName("ReconnectThread");
}
// get the membership manager for quorum checks
Distribution mbrMgr = dm.getDistribution();
quorumChecker = mbrMgr.getQuorumChecker();
if (logger.isDebugEnabled()) {
if (quorumChecker == null) {
logger.debug("No quorum checks will be performed during reconnect attempts");
} else {
logger.debug("Initialized quorum checking service: {}", quorumChecker);
}
}
// LOG:CLEANUP: deal with reconnect and INHIBIT_DM_BANNER -- this should be ok now
String appendToLogFile = System.getProperty(APPEND_TO_LOG_FILE);
if (appendToLogFile == null) {
System.setProperty(APPEND_TO_LOG_FILE, "true");
}
String inhibitBanner = System.getProperty(InternalLocator.INHIBIT_DM_BANNER);
if (inhibitBanner == null) {
System.setProperty(InternalLocator.INHIBIT_DM_BANNER, "true");
}
if (forcedDisconnect) {
systemAttemptingReconnect = this;
}
try {
while (reconnectDS == null || !reconnectDS.isConnected()) {
if (isReconnectCancelled()) {
break;
}
if (!forcedDisconnect) {
if (isDebugEnabled) {
logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut);
}
if (reconnectAttemptCounter.get() >= maxTries) {
if (isDebugEnabled) {
logger.debug(
"Stopping the checkrequiredrole thread because reconnect : {} reached the max "
+ "number of reconnect tries : {}",
reconnectAttemptCounter, maxTries);
}
InternalCache internalCache = dm.getCache();
if (internalCache == null) {
throw new CacheClosedException(
"Some required roles missing");
} else {
throw internalCache.getCacheClosedException(
"Some required roles missing");
}
}
}
reconnectAttemptCounter.getAndIncrement();
if (isReconnectCancelled()) {
return;
}
logger.info("Disconnecting old DistributedSystem to prepare for a reconnect attempt");
try {
disconnect(true, reason, false);
} catch (Exception ee) {
logger.warn("Exception disconnecting for reconnect", ee);
}
TypeRegistry.init();
try {
reconnectLock.wait(timeOut);
} catch (InterruptedException e) {
logger.warn("Waiting thread for reconnect got interrupted.");
Thread.currentThread().interrupt();
return;
}
if (isReconnectCancelled()) {
return;
}
logger.info(
"Attempting to reconnect to the distributed system. This is attempt #{}.",
reconnectAttemptCounter);
int saveNumberOfTries = reconnectAttemptCounter.get();
try {
// notify listeners of each attempt and then again after successful
notifyReconnectListeners(this, reconnectDS, true);
if (locatorDMTypeForced) {
System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
}
configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE);
if (quorumChecker != null) {
configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker);
}
InternalDistributedSystem newDS = null;
if (isReconnectCancelled()) {
return;
}
try {
newDS = connectInternal(configProps, null, metricsService.getRebuilder(),
membershipLocator);
} catch (CancelException e) {
if (isReconnectCancelled()) {
return;
} else {
throw e;
}
} finally {
if (newDS == null && quorumChecker != null) {
// make sure the quorum checker is listening for messages from former members
quorumChecker.resume();
}
}
if (reconnectCancelled) {
newDS.disconnect();
continue;
}
reconnectDS = newDS;
} catch (SystemConnectException e) {
logger.debug("Attempt to reconnect failed with SystemConnectException");
if (e.getMessage().contains("Rejecting the attempt of a member using an older version")) {
logger.warn("Exception occurred while trying to connect the system during reconnect",
e);
attemptingToReconnect = false;
reconnectException = e;
return;
}
logger.warn("Caught SystemConnectException in reconnect", e);
continue;
} catch (GemFireConfigException e) {
logger.warn("Caught GemFireConfigException in reconnect", e);
continue;
} catch (Exception e) {
logger.warn("Exception occurred while trying to connect the system during reconnect",
e);
attemptingToReconnect = false;
reconnectException = e;
return;
} finally {
if (locatorDMTypeForced) {
System.getProperties().remove(InternalLocator.FORCE_LOCATOR_DM_TYPE);
}
reconnectAttemptCounter.set(saveNumberOfTries);
}
DistributionManager newDM = reconnectDS.getDistributionManager();
if (newDM instanceof ClusterDistributionManager) {
// Admin systems don't carry a cache, but for others we can now create
// a cache
if (newDM.getDMType() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) {
boolean retry;
do {
retry = false;
try {
cache = new InternalCacheBuilder()
.setCacheXMLDescription(cacheXML)
.create(reconnectDS);
if (!cache.isClosed()) {
createAndStartCacheServers(cacheServerCreation, cache);
if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
reconnectAttemptCounter.set(0);
}
}
} catch (GemFireConfigException e) {
if (e.getCause() instanceof ClusterConfigurationNotAvailableException) {
retry = true;
logger.info("Reconnected to the cluster but the cluster configuration service "
+ "isn't available - will retry creating the cache");
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
reconnectCancelled = true;
reconnectException = e;
break;
}
}
} catch (Exception e) {
// We need to give up because we'll probably get the same exception in
// the next attempt to build the cache.
logger.warn(
"Exception occurred while trying to create the cache during reconnect. "
+ "Auto-reconnect is terminating.",
e);
reconnectCancelled = true;
reconnectException = e;
break;
}
} while (retry);
}
}
if (reconnectDS != null && reconnectDS.isConnected()) {
// make sure the new DS and cache are stable before exiting this loop
try {
Thread.sleep(config.getMemberTimeout() * 3L);
} catch (InterruptedException e) {
logger.info("Reconnect thread has been interrupted - exiting");
Thread.currentThread().interrupt();
reconnectCancelled = true;
reconnectException = e;
return;
}
}
} // while()
if (isReconnectCancelled()) {
if (reconnectDS != null) {
reconnectDS.disconnect();
}
} else {
reconnectDS.isReconnectingDS = false;
if (reconnectDS.isConnected()) {
notifyReconnectListeners(this, reconnectDS, false);
}
}
} finally {
systemAttemptingReconnect = null;
attemptingToReconnect = false;
if (appendToLogFile == null) {
System.getProperties().remove(APPEND_TO_LOG_FILE);
} else {
System.setProperty(APPEND_TO_LOG_FILE, appendToLogFile);
}
if (inhibitBanner == null) {
System.getProperties().remove(InternalLocator.INHIBIT_DM_BANNER);
} else {
System.setProperty(InternalLocator.INHIBIT_DM_BANNER, inhibitBanner);
}
dm.getDistribution().setReconnectCompleted(true);
InternalDistributedSystem newds = reconnectDS;
if (newds != null) {
newds.getDM().getDistribution().setReconnectCompleted(true);
}
if (quorumChecker != null && (reconnectDS == null || !reconnectDS.isConnected())) {
quorumChecker.close();
}
}
if (isReconnectCancelled()) {
logger.debug("reconnect can no longer be done because of an explicit disconnect");
if (reconnectDS != null) {
reconnectDS.disconnect();
}
attemptingToReconnect = false;
} else if (reconnectDS != null && reconnectDS.isConnected()) {
logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
cache);
}
}