in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java [287:306]
private void restoreState() throws Exception {
LOGGER.info("Restore siddhi state");
final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
if (siddhiState.hasNext()) {
this.siddhiRuntime.restore(siddhiState.next());
}
LOGGER.info("Restore queued records state");
final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
if (queueState.hasNext()) {
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
try {
this.priorityQueue = restoreQueuerState(dataInputView);
} finally {
dataInputView.close();
byteArrayInputStream.close();
}
}
}