public ConsumerRecords poll()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [365:435]


    public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }

            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();

            int numberOfRecords = 0;

            while (item != null) {
                TopicName topicName = TopicName.get(item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageId msgId = msg.getMessageId();
                if (msgId instanceof TopicMessageIdImpl) {
                    msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
                }
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                	resetOffsets(tp);
                }

                K key = getKey(topic, msg);
                if (valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
                }
                V value = valueSchema.decode(msg.getData());

                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
                long timestamp = msg.getPublishTime();

                if (msg.getEventTime() > 0) {
                    // If we have Event time, use that in preference
                    timestamp = msg.getEventTime();
                    timestampType = TimestampType.CREATE_TIME;
                }

                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
                        timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);

                records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);

                // Update last offset seen by application
                lastReceivedOffset.put(tp, offset);
                unpolledPartitions.remove(tp);

                if (++numberOfRecords >= maxRecordsInSinglePoll) {
                    break;
                }

                // Check if we have an item already available
                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
            }

            if (isAutoCommit && !records.isEmpty()) {
                // Commit the offset of previously dequeued messages
                commitAsync();
            }

            // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
            return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }