private void startSiddhiRuntime()

in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java [226:247]


    private void startSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            this.siddhiManager = this.siddhiPlan.createSiddhiManager();
            for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
                this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
            }

            SiddhiApp siddhiApp = SiddhiCompiler.parse(executionExpression);
            Annotation nameAnnotation = new Annotation("Name");
            Element element = new Element(null, operatorName);
            List<Element> elements = new ArrayList<>();
            elements.add(element);
            nameAnnotation.setElements(elements);
            siddhiApp.getAnnotations().add(nameAnnotation);
            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
            this.siddhiRuntime.start();
            registerInputAndOutput(this.siddhiRuntime);
            LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
        } else {
            throw new IllegalStateException("Siddhi has already been initialized");
        }
    }