in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [416:480]
public List<ConnectRecord> poll() throws InterruptedException {
if (unAckCounter.get() > MAX_UNACK) {
Thread.sleep(2);
if (pollCounter.incrementAndGet() % 1000 == 0) {
log.info("poll unAckCount > 10000 sleep 2ms");
}
return null;
}
// sync wait for rate limit
boolean overflow = TpsLimiter.isOverFlow(sourceTaskContext.getTaskName(), tpsLimit);
if (overflow) {
if (rateCounter.incrementAndGet() % 1000 == 0) {
log.info("rateLimiter occur.");
}
return null;
}
try {
List<MessageExt> messageExts = pullConsumer.poll();
// PullResult pullResult = pullConsumer.pull(mq, tag, pullRequest.getNextOffset(), maxNum);
if (null != messageExts && messageExts.size() > 0) {
List<ConnectRecord> connectRecords = new ArrayList<>(messageExts.size());
int index = 0;
for (MessageExt msg : messageExts) {
MessageQueue mq = new MessageQueue();
mq.setTopic(msg.getTopic());
mq.setBrokerName(msg.getBrokerName());
mq.setQueueId(msg.getQueueId());
boolean put = putPulledQueueOffset(mq, msg.getQueueOffset(), 1, msg);
if (!put) {
log.error("bug");
int i = 0;
for (MessageExt tmp : messageExts) {
if (i++ < index) {
removeMessage(mq, tmp.getQueueOffset());
}
}
return null;
}
index++;
ConnectRecord connectRecord = convertToSinkDataEntry(msg);
try {
if (connectRecord != null) {
connectRecords.add(connectRecord);
TpsLimiter.addPv(connectorConfig.getConnectorId(), 1);
}
} finally {
if (connectRecord == null) {
long canCommitOffset = removeMessage(mq, msg.getQueueOffset());
commitOffset(mq, canCommitOffset);
}
}
}
return connectRecords;
} else {
if ((noMessageCounter.incrementAndGet() + random.nextInt(10)) % printLogThreshold == 0) {
log.info("no new message");
}
}
} catch (Exception e) {
log.error("pull message error,", e);
}
return null;
}