in oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java [64:140]
public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
this.config = config;
serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule());
final CacheBuilder<Object, Object> cacheBuilder =
CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(3));
nodeIPs = cacheBuilder.build(CacheLoader.from(() -> {
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
return kubernetesClient
.nodes()
.list()
.getItems()
.stream()
.map(Node::getStatus)
.map(NodeStatus::getAddresses)
.flatMap(it -> it.stream().map(NodeAddress::getAddress)
.filter(StringUtil::isNotBlank))
.collect(toSet());
} catch (Exception e) {
log.error("Failed to list Nodes.", e);
return Collections.emptySet();
}
}));
ipServiceMetaInfoMap = cacheBuilder.build(new CacheLoader<>() {
@Override
public ServiceMetaInfo load(String ip) {
final Optional<Pod> pod = KubernetesPods.INSTANCE.findByIP(ip);
if (pod.isEmpty()) {
log.debug("No corresponding Pod for IP: {}", ip);
return config.serviceMetaInfoFactory().unknown();
}
final Optional<ObjectID> serviceID =
KubernetesEndpoints.INSTANCE
.list()
.stream()
.filter(endpoints -> endpoints.getMetadata() != null)
.filter(endpoints -> endpoints.getSubsets() != null)
.map(endpoints -> {
final ObjectMeta metadata = endpoints.getMetadata();
if (endpoints
.getSubsets()
.stream()
.filter(subset -> subset.getAddresses() != null)
.flatMap(subset -> subset.getAddresses().stream())
.anyMatch(address -> Objects.equals(ip, address.getIp()))) {
return ObjectID
.builder()
.name(metadata.getName())
.namespace(metadata.getNamespace())
.build();
}
return null;
})
.filter(Objects::nonNull)
.findFirst();
if (serviceID.isEmpty()) {
log.debug("No corresponding endpoint for IP: {}", ip);
return config.serviceMetaInfoFactory().unknown();
}
final Optional<Service> service =
KubernetesServices.INSTANCE.findByID(serviceID.get());
if (service.isEmpty()) {
log.debug("No service for namespace and name: {}", serviceID.get());
return config.serviceMetaInfoFactory().unknown();
}
log.debug(
"Composing service meta info from service and pod for IP: {}", ip);
return composeServiceMetaInfo(service.get(), pod.get());
}
});
}