public List poll()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [416:480]


    public List<ConnectRecord> poll() throws InterruptedException {
        if (unAckCounter.get() > MAX_UNACK) {
            Thread.sleep(2);
            if (pollCounter.incrementAndGet() % 1000 == 0) {
                log.info("poll unAckCount > 10000 sleep 2ms");
            }
            return null;
        }
        // sync wait for rate limit
        boolean overflow = TpsLimiter.isOverFlow(sourceTaskContext.getTaskName(), tpsLimit);
        if (overflow) {
            if (rateCounter.incrementAndGet() % 1000 == 0) {
                log.info("rateLimiter occur.");
            }
            return null;
        }
        try {
            List<MessageExt> messageExts = pullConsumer.poll();
//            PullResult pullResult = pullConsumer.pull(mq, tag, pullRequest.getNextOffset(), maxNum);
            if (null != messageExts && messageExts.size() > 0) {
                List<ConnectRecord> connectRecords = new ArrayList<>(messageExts.size());
                int index = 0;
                for (MessageExt msg : messageExts) {
                    MessageQueue mq = new MessageQueue();
                    mq.setTopic(msg.getTopic());
                    mq.setBrokerName(msg.getBrokerName());
                    mq.setQueueId(msg.getQueueId());

                    boolean put = putPulledQueueOffset(mq, msg.getQueueOffset(), 1, msg);
                    if (!put) {
                        log.error("bug");
                        int i = 0;
                        for (MessageExt tmp : messageExts) {
                            if (i++ < index) {
                                removeMessage(mq, tmp.getQueueOffset());
                            }
                        }
                        return null;
                    }
                    index++;

                    ConnectRecord connectRecord = convertToSinkDataEntry(msg);
                    try {
                        if (connectRecord != null) {
                            connectRecords.add(connectRecord);
                            TpsLimiter.addPv(connectorConfig.getConnectorId(), 1);
                        }
                    } finally {
                        if (connectRecord == null) {
                            long canCommitOffset = removeMessage(mq, msg.getQueueOffset());
                            commitOffset(mq, canCommitOffset);
                        }
                    }
                }
                return connectRecords;
            } else {
                if ((noMessageCounter.incrementAndGet() + random.nextInt(10)) % printLogThreshold == 0) {
                    log.info("no new message");
                }
            }
        } catch (Exception e) {
            log.error("pull message error,", e);
        }
        return null;
    }