in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java [221:278]
public static EventRecorder create(
KubernetesClient client, Collection<FlinkResourceListener> listeners) {
BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumerFlinkResource =
(resource, event) -> {
var ctx =
new FlinkResourceListener.ResourceEventContext() {
@Override
public Event getEvent() {
return event;
}
@Override
public AbstractFlinkResource<?, ?> getFlinkResource() {
return resource;
}
@Override
public KubernetesClient getKubernetesClient() {
return client;
}
};
listeners.forEach(
listener -> {
if (resource instanceof FlinkDeployment) {
listener.onDeploymentEvent(ctx);
} else {
listener.onSessionJobEvent(ctx);
}
});
AuditUtils.logContext(ctx);
};
BiConsumer<FlinkStateSnapshot, Event> biConsumerFlinkStateSnapshot =
(resource, event) -> {
var ctx =
new FlinkResourceListener.FlinkStateSnapshotEventContext() {
@Override
public Event getEvent() {
return event;
}
@Override
public FlinkStateSnapshot getStateSnapshot() {
return resource;
}
@Override
public KubernetesClient getKubernetesClient() {
return client;
}
};
listeners.forEach(listener -> listener.onStateSnapshotEvent(ctx));
AuditUtils.logContext(ctx);
};
return new EventRecorder(biConsumerFlinkResource, biConsumerFlinkStateSnapshot);
}