in computer-k8s/src/main/java/org/apache/hugegraph/computer/k8s/driver/KubernetesDriver.java [315:372]
private Watch initWatch() {
return this.operation.watch(new Watcher<HugeGraphComputerJob>() {
@Override
public void eventReceived(Action action,
HugeGraphComputerJob computerJob) {
if (computerJob == null) {
return;
}
if (action == Action.ERROR) {
return;
}
String jobId = computerJob.getSpec().getJobId();
if (StringUtils.isBlank(jobId)) {
return;
}
Pair<CompletableFuture<Void>,
JobObserver> pair = KubernetesDriver.this.waits.get(jobId);
if (pair != null) {
CompletableFuture<?> future = pair.getLeft();
JobObserver observer = pair.getRight();
@SuppressWarnings("resource")
KubernetesDriver driver = KubernetesDriver.this;
JobState jobState = driver.buildJobState(computerJob);
observer.onJobStateChanged(jobState);
if (JobStatus.finished(jobState.jobStatus())) {
future.complete(null);
driver.cancelWait(jobId);
}
}
}
@Override
public void onClose(WatcherException cause) {
for (Pair<CompletableFuture<Void>, JobObserver> pair :
KubernetesDriver.this.waits.values()) {
if (pair != null) {
CompletableFuture<Void> future = pair.getLeft();
future.completeExceptionally(cause);
}
}
synchronized (KubernetesDriver.this.watchActive) {
KubernetesDriver.this.waits.clear();
Watch watch = KubernetesDriver.this.watch;
if (watch != null) {
watch.close();
}
KubernetesDriver.this.watchActive.setFalse();
}
}
});
}