in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java [213:255]
StatusRecorder<CR, S> create(
KubernetesClient kubernetesClient,
MetricManager<CR> metricManager,
Collection<FlinkResourceListener> listeners) {
BiConsumer<CR, S> consumer =
(resource, previousStatus) -> {
var now = Instant.now();
var ctx =
new FlinkResourceListener.StatusUpdateContext() {
@Override
public S getPreviousStatus() {
return previousStatus;
}
@Override
public AbstractFlinkResource<?, S> getFlinkResource() {
return resource;
}
@Override
public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
@Override
public Instant getTimestamp() {
return now;
}
};
listeners.forEach(
listener -> {
if (resource instanceof FlinkDeployment) {
listener.onDeploymentStatusUpdate(ctx);
} else {
listener.onSessionJobStatusUpdate(ctx);
}
});
AuditUtils.logContext(ctx);
};
return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
}