public RecordsWithSplitIds fetch()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [107:135]


    public RecordsWithSplitIds<MessageView> fetch() throws IOException {
        wakeup = false;
        RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds =
                new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
        try {
            Duration duration =
                    Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT));
            List<MessageView> messageExtList = consumer.poll(duration);
            for (MessageView messageView : messageExtList) {
                String splitId =
                        UtilAll.getSplitId(
                                new MessageQueue(
                                        messageView.getTopic(),
                                        messageView.getBrokerName(),
                                        messageView.getQueueId()));
                recordsWithSplitIds.recordsForSplit(splitId).add(messageView);
                if (this.configuration.getBoolean(RocketMQSourceOptions.GLOBAL_DEBUG_MODE)) {
                    LOG.info(
                            "Reader fetch splitId: {}, messageId: {}",
                            splitId,
                            messageView.getMessageId());
                }
            }
            recordsWithSplitIds.prepareForRead();
        } catch (Exception e) {
            LOG.error("Reader fetch split error", e);
        }
        return recordsWithSplitIds;
    }