in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [117:154]
public RecordsWithSplitIds<MessageView> fetch() throws IOException {
lock.lock();
wakeup = false;
RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds =
new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
try {
this.recordsWithSplitIds.finishedSplits.forEach(
splitId -> recordsWithSplitIds.addFinishedSplit(splitId));
this.recordsWithSplitIds.finishedSplits.clear();
} finally {
lock.unlock();
}
try {
Duration duration =
Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT));
List<MessageView> messageExtList = consumer.poll(duration);
for (MessageView messageView : messageExtList) {
String splitId =
UtilAll.getSplitId(
new MessageQueue(
messageView.getTopic(),
messageView.getBrokerName(),
messageView.getQueueId()));
recordsWithSplitIds.recordsForSplit(splitId).add(messageView);
if (this.configuration.getBoolean(RocketMQSourceOptions.GLOBAL_DEBUG_MODE)) {
LOG.info(
"Reader fetch splitId: {}, messageId: {}",
splitId,
messageView.getMessageId());
}
}
recordsWithSplitIds.prepareForRead();
} catch (Exception e) {
LOG.error("Reader fetch split error", e);
}
return recordsWithSplitIds;
}