in k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncServer.java [102:258]
public void startInformer() throws IOException {
ApiClient apiClient;
CoreV1Api coreV1Api;
if (k8sSyncConfig.isOutsideCluster()) {
apiClient = getOutsideApiClient();
} else {
apiClient = ClientBuilder.cluster().build();
}
// set the global default api-client
Configuration.setDefaultApiClient(apiClient);
coreV1Api = new CoreV1Api();
OkHttpClient httpClient = apiClient.getHttpClient().newBuilder().build();
apiClient.setHttpClient(httpClient);
factory = new SharedInformerFactory(apiClient);
SharedIndexInformer<V1Service> serviceInformer =
factory.sharedIndexInformerFor(
(CallGeneratorParams params) -> {
CoreV1Api.APIlistServiceForAllNamespacesRequest request = coreV1Api.listServiceForAllNamespaces();
request.resourceVersion(params.resourceVersion);
request.timeoutSeconds(params.timeoutSeconds);
request.watch(params.watch);
return request.buildCall(null);
},
V1Service.class,
V1ServiceList.class);
SharedIndexInformer<V1Endpoints> endpointInformer =
factory.sharedIndexInformerFor(
(CallGeneratorParams params) -> {
CoreV1Api.APIlistEndpointsForAllNamespacesRequest request = coreV1Api.listEndpointsForAllNamespaces();
request.resourceVersion(params.resourceVersion);
request.timeoutSeconds(params.timeoutSeconds);
request.watch(params.watch);
return request.buildCall(null);
},
V1Endpoints.class,
V1EndpointsList.class);
serviceInformer.addEventHandler(
new ResourceEventHandler<V1Service>() {
@Override
public void onAdd(V1Service service) {
if (service.getMetadata() == null || service.getSpec() == null) {
return;
}
String serviceName = service.getMetadata().getName();
String namespace = service.getMetadata().getNamespace();
List<V1ServicePort> servicePorts = service.getSpec().getPorts();
try {
registerService(namespace, serviceName, servicePorts, false, endpointInformer);
Loggers.MAIN.info("add service, namespace:" + namespace + " serviceName: " + serviceName);
} catch (Exception e) {
Loggers.MAIN.warn("add service fail, message:" + e.getMessage() + " namespace:"
+ namespace + " serviceName: " + serviceName);
}
}
@Override
public void onUpdate(V1Service oldService, V1Service newService) {
if (oldService.getMetadata() == null || oldService.getSpec() == null
|| newService.getMetadata() == null || newService.getSpec() == null) {
return;
}
List<V1ServicePort> oldServicePorts = oldService.getSpec().getPorts();
String serviceName = newService.getMetadata().getName();
String namespace = newService.getMetadata().getNamespace();
List<V1ServicePort> newServicePorts = newService.getSpec().getPorts();
boolean portChanged = compareServicePorts(oldServicePorts, newServicePorts);
try {
registerService(namespace, serviceName, newServicePorts, portChanged, endpointInformer);
Loggers.MAIN.info("update service, namespace: " + namespace + " serviceName: " + serviceName);
} catch (Exception e) {
Loggers.MAIN.warn("update service fail, message: " + e.getMessage() + " namespace: "
+ namespace + " serviceName: " + serviceName);
}
}
@Override
public void onDelete(V1Service service, boolean deletedFinalStateUnknown) {
if (service.getMetadata() == null) {
return;
}
String serviceName = service.getMetadata().getName();
String namespace = service.getMetadata().getNamespace();
try {
unregisterService(namespace, serviceName);
Loggers.MAIN.info("delete service, namespace:" + namespace + " serviceName:" + serviceName);
} catch (Exception e) {
Loggers.MAIN.warn("delete service fail, message: " + e.getMessage()
+ " namespace:" + namespace + " serviceName:" + serviceName);
}
}
});
endpointInformer.addEventHandler(new ResourceEventHandler<V1Endpoints>() {
@Override
public void onAdd(V1Endpoints obj) {
if (obj.getMetadata() == null) {
return;
}
String serviceName = obj.getMetadata().getName();
String namespace = obj.getMetadata().getNamespace();
Set<String> addIpSet = getIpFromEndpoints(obj);
//TODO 因为需要指定namespace,这里servicelister需要重新new,是否可以优化,比如说作为单例的放到map中
Lister<V1Service> serviceLister = new Lister<>(serviceInformer.getIndexer(), namespace);
V1Service service = serviceLister.get(serviceName);
List<V1ServicePort> servicePorts = service.getSpec().getPorts();
try {
registerInstances(addIpSet, namespace, serviceName, servicePorts);
Loggers.MAIN.info("add instances, namespace:" + namespace + " serviceName: " + serviceName);
} catch (NacosException e) {
Loggers.MAIN.warn("add instances fail, message:" + e.getMessage() + " namespace:" + namespace + ", serviceName: " + serviceName);
}
}
@Override
public void onUpdate(V1Endpoints oldObj, V1Endpoints newObj) {
if (newObj.getMetadata() == null) {
return;
}
String serviceName = newObj.getMetadata().getName();
String namespace = newObj.getMetadata().getNamespace();
Lister<V1Service> serviceLister = new Lister<>(serviceInformer.getIndexer(), namespace);
V1Service service = serviceLister.get(serviceName);
List<V1ServicePort> servicePorts = service.getSpec().getPorts();
try {
registerService(namespace, serviceName, servicePorts, false, endpointInformer);
Loggers.MAIN.info("update instances, namespace:" + namespace + " serviceName: " + serviceName);
} catch (NacosException e) {
Loggers.MAIN.warn("update instances fail, message:" + e.getMessage() + " namespace:"
+ namespace + ", serviceName: " + serviceName);
}
}
@Override
public void onDelete(V1Endpoints obj, boolean deletedFinalStateUnknown) {
if (obj.getMetadata() == null) {
return;
}
String serviceName = obj.getMetadata().getName();
String namespace = obj.getMetadata().getNamespace();
Set<String> deleteIpSet = getIpFromEndpoints(obj);
try {
List<? extends Instance> oldInstanceList = instanceOperatorClient.listAllInstances(namespace, serviceName);
unregisterInstances(deleteIpSet, namespace, serviceName, oldInstanceList);
Loggers.MAIN.info("delete instances, namespace:" + namespace + ", serviceName: " + serviceName);
} catch (NacosException e) {
Loggers.MAIN.info("delete instances fail, namespace:" + namespace + ", serviceName: " + serviceName);
}
}
});
factory.startAllRegisteredInformers();
}