in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java [226:247]
private void startSiddhiRuntime() {
if (this.siddhiRuntime == null) {
this.siddhiManager = this.siddhiPlan.createSiddhiManager();
for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
}
SiddhiApp siddhiApp = SiddhiCompiler.parse(executionExpression);
Annotation nameAnnotation = new Annotation("Name");
Element element = new Element(null, operatorName);
List<Element> elements = new ArrayList<>();
elements.add(element);
nameAnnotation.setElements(elements);
siddhiApp.getAnnotations().add(nameAnnotation);
this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
this.siddhiRuntime.start();
registerInputAndOutput(this.siddhiRuntime);
LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
} else {
throw new IllegalStateException("Siddhi has already been initialized");
}
}