in core/src/main/java/com/alibaba/nacos/core/remote/NacosRuntimeConnectionEjector.java [59:138]
private void ejectOutdatedConnection() {
try {
Loggers.CONNECTION.info("Connection check task start");
Map<String, Connection> connections = connectionManager.connections;
int totalCount = connections.size();
int currentSdkClientCount = connectionManager.currentSdkClientCount();
Loggers.CONNECTION.info("Long connection metrics detail ,Total count ={}, sdkCount={},clusterCount={}",
totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount));
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
//outdated connections collect.
for (Map.Entry<String, Connection> entry : connections.entrySet()) {
Connection client = entry.getValue();
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
} else if (client.getMetaInfo().pushQueueBlockTimesLastOver(300 * 1000)) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// check out date connection
Loggers.CONNECTION.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = connectionManager.getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 5000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
Loggers.CONNECTION.info("[{}]send connection active request ", outDateConnectionId);
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
Loggers.CONNECTION.error("[{}]Error occurs when check client active detection ,error={}",
outDateConnectionId, e);
latch.countDown();
}
}
latch.await(5000L, TimeUnit.MILLISECONDS);
Loggers.CONNECTION.info("Out dated connection check successCount={}", successConnections.size());
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
Loggers.CONNECTION.info("[{}]Unregister Out dated connection....", outDateConnectionId);
connectionManager.unregister(outDateConnectionId);
}
}
}
Loggers.CONNECTION.info("Connection check task end");
} catch (Throwable e) {
Loggers.CONNECTION.error("Error occurs during connection check... ", e);
}
}