in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java [198:239]
private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) {
Map<MessageQueue, Tuple2<Long, Long>> checkMap = sourceEvent.getAssignedMq();
Set<MessageQueue> assignedMq = checkMap.keySet();
Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet();
Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq);
Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq);
if (increaseSet.isEmpty() && decreaseSet.isEmpty()) {
LOG.info("No need to checkpoint, current assigned mq is same as before.");
}
if (!increaseSet.isEmpty()) {
SplitsAddition<RocketMQSourceSplit> increase;
increase =
new SplitsAddition<>(
increaseSet.stream()
.map(
mq ->
new RocketMQSourceSplit(
mq,
checkMap.get(mq).f0,
checkMap.get(mq).f1,
true))
.collect(Collectors.toList()));
reader.handleSplitsChanges(increase);
}
if (!decreaseSet.isEmpty()) {
SplitsAddition<RocketMQSourceSplit> decrease;
decrease =
new SplitsAddition<>(
decreaseSet.stream()
.map(
mq ->
new RocketMQSourceSplit(
mq,
checkMap.get(mq).f0,
checkMap.get(mq).f1,
false))
.collect(Collectors.toList()));
reader.handleSplitsChanges(decrease);
}
}