in dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java [380:434]
private List<ServiceInstance> toServiceInstance(Endpoints endpoints, String serviceName) {
Map<String, String> serviceSelector = getServiceSelector(serviceName);
if (serviceSelector == null) {
return new LinkedList<>();
}
Map<String, Pod> pods =
kubernetesClient.pods().inNamespace(namespace).withLabels(serviceSelector).list().getItems().stream()
.collect(Collectors.toMap(pod -> pod.getMetadata().getName(), pod -> pod));
List<ServiceInstance> instances = new LinkedList<>();
Set<Integer> instancePorts = new HashSet<>();
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
instancePorts.addAll(endpointSubset.getPorts().stream()
.map(EndpointPort::getPort)
.collect(Collectors.toSet()));
}
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
for (EndpointAddress address : endpointSubset.getAddresses()) {
Pod pod = pods.get(address.getTargetRef().getName());
String ip = address.getIp();
if (pod == null) {
logger.warn(
REGISTRY_UNABLE_MATCH_KUBERNETES,
"",
"",
"Unable to match Kubernetes Endpoint address with Pod. " + "EndpointAddress Hostname: "
+ address.getTargetRef().getName());
continue;
}
instancePorts.forEach(port -> {
ServiceInstance serviceInstance = new DefaultServiceInstance(
serviceName, ip, port, ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
String properties = pod.getMetadata().getAnnotations().get(KUBERNETES_PROPERTIES_KEY);
if (StringUtils.isNotEmpty(properties)) {
serviceInstance.getMetadata().putAll(JsonUtils.toJavaObject(properties, Map.class));
instances.add(serviceInstance);
} else {
logger.warn(
REGISTRY_UNABLE_FIND_SERVICE_KUBERNETES,
"",
"",
"Unable to find Service Instance metadata in Pod Annotations. "
+ "Possibly cause: provider has not been initialized successfully. "
+ "EndpointAddress Hostname: "
+ address.getTargetRef().getName());
}
});
}
}
return instances;
}