in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java [101:172]
public void run() {
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
.filter(StringUtil::isNotBlank)
.map(eachBackendService -> eachBackendService.split(":"))
.filter(domainPortPairs -> {
if (domainPortPairs.length < 2) {
LOGGER.debug("Service address [{}] format error. The expected format is IP:port", domainPortPairs[0]);
return false;
}
return true;
})
.flatMap(domainPortPairs -> {
try {
return Arrays.stream(InetAddress.getAllByName(domainPortPairs[0]))
.map(InetAddress::getHostAddress)
.map(ip -> String.format("%s:%s", ip, domainPortPairs[1]));
} catch (Throwable t) {
LOGGER.error(t, "Failed to resolve {} of backend service.", domainPortPairs[0]);
}
return Stream.empty();
})
.distinct()
.collect(Collectors.toList());
}
if (reconnect) {
if (grpcServers.size() > 0) {
String server = "";
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
if (index != selectedIdx) {
selectedIdx = index;
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");
if (managedChannel != null) {
managedChannel.shutdownNow();
}
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
.addChannelDecorator(new AuthenticationDecorator())
.build();
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
} else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
// Reconnect to the same server is automatically done by GRPC,
// therefore we are responsible to check the connectivity and
// set the state and notify listeners
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
}
return;
} catch (Throwable t) {
LOGGER.error(t, "Create channel to {} fail.", server);
}
}
LOGGER.debug(
"Selected collector grpc service is not available. Wait {} seconds to retry",
Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL
);
}
}