in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [365:435]
public ConsumerRecords<K, V> poll(long timeoutMillis) {
try {
QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
if (item == null) {
return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
int numberOfRecords = 0;
while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageId msgId = msg.getMessageId();
if (msgId instanceof TopicMessageIdImpl) {
msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
}
long offset = MessageIdUtils.getOffset(msgId);
TopicPartition tp = new TopicPartition(topic, partition);
if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
resetOffsets(tp);
}
K key = getKey(topic, msg);
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
}
V value = valueSchema.decode(msg.getData());
TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
long timestamp = msg.getPublishTime();
if (msg.getEventTime() > 0) {
// If we have Event time, use that in preference
timestamp = msg.getEventTime();
timestampType = TimestampType.CREATE_TIME;
}
ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
// Update last offset seen by application
lastReceivedOffset.put(tp, offset);
unpolledPartitions.remove(tp);
if (++numberOfRecords >= maxRecordsInSinglePoll) {
break;
}
// Check if we have an item already available
item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
}
if (isAutoCommit && !records.isEmpty()) {
// Commit the offset of previously dequeued messages
commitAsync();
}
// If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}