public K8SServiceRegistry()

in oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java [64:140]


    public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
        this.config = config;

        serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule());

        final CacheBuilder<Object, Object> cacheBuilder =
            CacheBuilder.newBuilder()
                .expireAfterWrite(Duration.ofMinutes(3));

        nodeIPs = cacheBuilder.build(CacheLoader.from(() -> {
            try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
                return kubernetesClient
                    .nodes()
                    .list()
                    .getItems()
                    .stream()
                    .map(Node::getStatus)
                    .map(NodeStatus::getAddresses)
                    .flatMap(it -> it.stream().map(NodeAddress::getAddress)
                        .filter(StringUtil::isNotBlank))
                    .collect(toSet());
            } catch (Exception e) {
                log.error("Failed to list Nodes.", e);
                return Collections.emptySet();
            }
        }));

        ipServiceMetaInfoMap = cacheBuilder.build(new CacheLoader<>() {
            @Override
            public ServiceMetaInfo load(String ip) {
                final Optional<Pod> pod = KubernetesPods.INSTANCE.findByIP(ip);
                if (pod.isEmpty()) {
                    log.debug("No corresponding Pod for IP: {}", ip);
                    return config.serviceMetaInfoFactory().unknown();
                }

                final Optional<ObjectID> serviceID =
                    KubernetesEndpoints.INSTANCE
                        .list()
                        .stream()
                        .filter(endpoints -> endpoints.getMetadata() != null)
                        .filter(endpoints -> endpoints.getSubsets() != null)
                        .map(endpoints -> {
                            final ObjectMeta metadata = endpoints.getMetadata();
                            if (endpoints
                                .getSubsets()
                                .stream()
                                .filter(subset -> subset.getAddresses() != null)
                                .flatMap(subset -> subset.getAddresses().stream())
                                .anyMatch(address -> Objects.equals(ip, address.getIp()))) {
                                return ObjectID
                                    .builder()
                                    .name(metadata.getName())
                                    .namespace(metadata.getNamespace())
                                    .build();
                            }
                            return null;
                        })
                        .filter(Objects::nonNull)
                        .findFirst();
                if (serviceID.isEmpty()) {
                    log.debug("No corresponding endpoint for IP: {}", ip);
                    return config.serviceMetaInfoFactory().unknown();
                }

                final Optional<Service> service =
                    KubernetesServices.INSTANCE.findByID(serviceID.get());
                if (service.isEmpty()) {
                    log.debug("No service for namespace and name: {}", serviceID.get());
                    return config.serviceMetaInfoFactory().unknown();
                }
                log.debug(
                    "Composing service meta info from service and pod for IP: {}", ip);
                return composeServiceMetaInfo(service.get(), pod.get());
            }
        });
    }