in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [157:221]
public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange) {
// Current not support assign addition splits.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
if (!initFinish) {
LOG.info("Start to init reader");
SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent();
sourceEvent.setSplits(splitsChange.splits());
sourceReaderContext.sendSourceEventToCoordinator(sourceEvent);
initFinish = true;
}
lock.lock();
LOG.info("Receive split change: " + splitsChange.splits().toString());
// Assignment.
ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>();
try {
// Set up the stopping timestamps.
splitsChange
.splits()
.forEach(
split -> {
if (!split.getIsIncrease()) {
finishSplitAtRecord(
split.getMessageQueue(),
split.getStoppingOffset(),
recordsWithSplitIds);
} else {
if (!currentOffsetTable.containsKey(split.getMessageQueue())) {
registerSplits(split);
newOffsetTable.put(
split.getMessageQueue(),
new Tuple2<>(
split.getStartingOffset(),
split.getStoppingOffset()));
}
}
});
} finally {
lock.unlock();
}
// It will replace the previous assignment
consumer.assign(currentOffsetTable.keySet());
// set offset to consumer
for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) {
MessageQueue messageQueue = entry.getKey();
Long startingOffset = entry.getValue().f0;
try {
consumer.seek(messageQueue, startingOffset == -1L ? 0L : startingOffset);
} catch (Exception e) {
String info =
String.format(
"messageQueue:%s, seek to starting offset:%s",
messageQueue, startingOffset);
throw new FlinkRuntimeException(info, e);
}
}
}