in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [496:527]
public void cancel() {
log.debug("cancel ...");
runningChecker.setRunning(false);
if (timer != null) {
timer.shutdown();
timer = null;
}
if (executor != null) {
executor.shutdown();
executor = null;
}
if (consumer != null) {
consumer.shutdown();
consumer = null;
}
if (offsetTable != null) {
offsetTable.clear();
offsetTable = null;
}
if (restoredOffsets != null) {
restoredOffsets.clear();
restoredOffsets = null;
}
if (pendingOffsetsToCommit != null) {
pendingOffsetsToCommit.clear();
pendingOffsetsToCommit = null;
}
}