in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java [64:84]
public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) {
SiddhiManager siddhiManager = null;
SiddhiAppRuntime runtime = null;
try {
siddhiManager = new SiddhiManager();
runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
if (definitionMap.containsKey(streamId)) {
return definitionMap.get(streamId);
} else {
throw new IllegalArgumentException("Unknown stream id" + streamId);
}
} finally {
if (runtime != null) {
runtime.shutdown();
}
if (siddhiManager != null) {
siddhiManager.shutdown();
}
}
}