in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java [702:845]
protected boolean haveInstancesInServerGroupChanged(ServerGroup serverGroup, Set<InetSocketAddress> discoveredHostsInServerGroup) {
final List<EVCacheClient> clients = memcachedInstancesByServerGroup.get(serverGroup);
// 1. if we have discovered instances in zone but not in our map then
// return immediately
if (clients == null) return true;
// 2. Do a quick check based on count (active, inactive and discovered)
for (int i = 0; i < clients.size(); i++) {
final int size = clients.size();
final EVCacheClient client = clients.get(i);
final EVCacheConnectionObserver connectionObserver = client.getConnectionObserver();
final int activeServerCount = connectionObserver.getActiveServerCount();
final int inActiveServerCount = connectionObserver.getInActiveServerCount();
final int sizeInDiscovery = discoveredHostsInServerGroup.size();
final int sizeInHashing = client.getNodeLocator().getAll().size();
if (i == 0) getConfigGauge("sizeInDiscovery", serverGroup).set(Long.valueOf(sizeInDiscovery));
if (log.isDebugEnabled()) log.debug("\n\tApp : " + _appName + "\n\tServerGroup : " + serverGroup + "\n\tActive Count : " + activeServerCount
+ "\n\tInactive Count : " + inActiveServerCount + "\n\tDiscovery Count : " + sizeInDiscovery + "\n\tsizeInHashing : " + sizeInHashing);
if (log.isDebugEnabled()) log.debug("\n\tApp : " + _appName + "\n\tServerGroup : " + serverGroup
+ "\n\tActive Count : " + activeServerCount + "\n\tInactive Count : "
+ inActiveServerCount + "\n\tDiscovery Count : " + sizeInDiscovery + "\n\tsizeInHashing : " + sizeInHashing);
final long currentTime = System.currentTimeMillis();
boolean reconcile = false;
if (currentTime - lastReconcileTime > reconcileInterval.get()) {
reconcile = true;
lastReconcileTime = currentTime;
getConfigGauge(EVCacheMetricsFactory.POOL_RECONCILE, serverGroup).set(Long.valueOf(1));
} else {
getConfigGauge(EVCacheMetricsFactory.POOL_RECONCILE, serverGroup).set(Long.valueOf(0));
}
final boolean hashingSizeDiff = (sizeInHashing != sizeInDiscovery && sizeInHashing != activeServerCount);
if (reconcile || activeServerCount != sizeInDiscovery || inActiveServerCount > 0 || hashingSizeDiff) {
if (log.isDebugEnabled()) log.debug("\n\t" + _appName + " & " + serverGroup
+ " experienced an issue.\n\tActive Server Count : " + activeServerCount);
if (log.isDebugEnabled()) log.debug("\n\tInActive Server Count : " + inActiveServerCount
+ "\n\tDiscovered Instances : " + sizeInDiscovery);
// 1. If a host is in discovery and we don't have an active or
// inActive connection to it then we will have to refresh our
// list. Typical case is we have replaced an existing node or
// expanded the cluster.
for (InetSocketAddress instance : discoveredHostsInServerGroup) {
if (!connectionObserver.getActiveServers().containsKey(instance) && !connectionObserver.getInActiveServers().containsKey(instance)) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; instance : " + instance
+ " not found and will shutdown the client and init it again.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(1));
return true;
}
}
// 2. If a host is not in discovery and is
// inActive for more than 15 mins then we will have to refresh our
// list. Typical case is we have replaced an existing node or
// decreasing the cluster. Replacing an instance should not take
// more than 20 mins (http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/monitoring-system-instance-status-check.html#types-of-instance-status-checks).
// Even if it does then we will refresh the client twice which
// should be ok.
// NOTE : For a zombie instance this will mean that it will take
// 15 mins after detaching and taking it OOS to be removed
// unless we force a refresh
// 12/5/2015 - Should we even do this anymore
for (Entry<InetSocketAddress, Long> entry : connectionObserver.getInActiveServers().entrySet()) {
if ((currentTime - entry.getValue().longValue()) > 1200000 && !discoveredHostsInServerGroup.contains(entry.getKey())) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup + "; instance : " + entry.getKey()
+ " not found in discovery and will shutdown the client and init it again.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(2));
return true;
}
}
// 3. Check to see if there are any inactive connections. If we
// find inactive connections and this node is not in discovery
// then we will refresh the client.
final Collection<MemcachedNode> allNodes = client.getNodeLocator().getAll();
for (MemcachedNode node : allNodes) {
if (node instanceof EVCacheNode) {
final EVCacheNode evcNode = ((EVCacheNode) node);
// If the connection to a node is not active then we
// will reconnect the client.
if (!evcNode.isActive() && !discoveredHostsInServerGroup.contains(evcNode.getSocketAddress())) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; Node : " + node + " is not active. Will shutdown the client and init it again.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(3));
return true;
}
}
}
// 4. if there is a difference in the number of nodes in the
// KetamaHashingMap then refresh
if (hashingSizeDiff) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; PoolSize : " + size + "; ActiveConnections : " + activeServerCount
+ "; InactiveConnections : " + inActiveServerCount + "; InDiscovery : " + sizeInDiscovery
+ "; InHashing : " + sizeInHashing + "; hashingSizeDiff : " + hashingSizeDiff
+ ". Since there is a diff in hashing size will shutdown the client and init it again.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(4));
return true;
}
// 5. If a host is in not discovery and we have an active connection to it for more than 20 mins then we will refresh
// Typical case is we have replaced an existing node but it has zombie. We are able to connect to it (hypervisor) but not talk to it
// or prana has shutdown successfully but not memcached. In such scenario we will refresh the cluster
for(InetSocketAddress instance : connectionObserver.getActiveServers().keySet()) {
if(!discoveredHostsInServerGroup.contains(instance)) {
if(!evCacheDiscoveryConnectionLostSet.containsKey(instance)) {
evCacheDiscoveryConnectionLostSet.put(instance, Long.valueOf(currentTime));
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; instance : " + instance + " not found in discovery. We will add to our list and monitor it.");
} else {
long lostDur = (currentTime - evCacheDiscoveryConnectionLostSet.get(instance).longValue());
if (lostDur >= 1200000) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; instance : " + instance + " not found in discovery for the past 20 mins and will shutdown the client and init it again.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(5));
evCacheDiscoveryConnectionLostSet.remove(instance);
return true;
} else {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; instance : " + instance + " not found in discovery for " + lostDur + " msec.");
}
}
}
}
// 9. If we have removed all instances or took them OOS in a
// ServerGroup then shutdown the client
if (sizeInDiscovery == 0) {
if (activeServerCount == 0 || inActiveServerCount > activeServerCount) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; Will shutdown the client since there are no active servers and no servers for this ServerGroup in disocvery.");
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(9));
return true;
}
}
}
getConfigGauge(EVCacheMetricsFactory.POOL_CHANGED, serverGroup).set(Long.valueOf(0));
}
reportPoolConifg();
return false;
}