in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [138:187]
public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange) {
// Current not support assign addition splits.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
// Assignment.
ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>();
// Set up the stopping timestamps.
splitsChange
.splits()
.forEach(
split -> {
MessageQueue messageQueue =
new MessageQueue(
split.getTopic(),
split.getBrokerName(),
split.getQueueId());
newOffsetTable.put(
messageQueue,
new Tuple2<>(
split.getStartingOffset(), split.getStoppingOffset()));
rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
});
// todo: log message queue change
// It will replace the previous assignment
Set<MessageQueue> incrementalSplits = newOffsetTable.keySet();
consumer.assign(incrementalSplits);
// set offset to consumer
for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) {
MessageQueue messageQueue = entry.getKey();
Long startingOffset = entry.getValue().f0;
try {
consumer.seek(messageQueue, startingOffset);
} catch (Exception e) {
String info =
String.format(
"messageQueue:%s, seek to starting offset:%s",
messageQueue, startingOffset);
throw new FlinkRuntimeException(info, e);
}
}
}