in computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java [98:151]
public void start() {
try {
this.kubeClient = new DefaultKubernetesClient();
String watchNamespace = this.config.get(
OperatorOptions.WATCH_NAMESPACE);
if (!Objects.equals(watchNamespace, Constants.ALL_NAMESPACE)) {
this.createNamespace(watchNamespace);
this.kubeClient = this.kubeClient.inNamespace(watchNamespace);
}
this.informerFactory = this.kubeClient.informers();
LOG.info("Watch namespace: " + watchNamespace);
this.addHealthCheck();
this.registerControllers();
this.informerFactory.startAllRegisteredInformers();
this.informerFactory.addSharedInformerEventListener(exception -> {
LOG.error("Informer event listener exception occurred",
exception);
OperatorEntrypoint.this.shutdown();
});
// Start all controller
this.controllerPool = ExecutorUtil.newFixedThreadPool(
this.controllers.size(), "controllers-%d");
CountDownLatch latch = new CountDownLatch(this.controllers.size());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (AbstractController<?> controller : this.controllers) {
futures.add(CompletableFuture.runAsync(() -> {
controller.run(latch);
}, this.controllerPool));
}
// Block until controller startup is complete
CompletableFuture.runAsync(() -> {
try {
latch.await();
this.addReadyCheck();
LOG.info("The Operator has been ready");
} catch (Throwable e) {
LOG.error("Failed to set up ready check");
OperatorEntrypoint.this.shutdown();
}
});
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}))
.get();
} catch (Throwable throwable) {
LOG.error("Failed to start Operator: ", throwable);
} finally {
this.shutdown();
}
}