public ConsumerRecords poll()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [300:356]


    public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }

            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();

            int numberOfRecords = 0;

            while (item != null) {
                TopicName topicName = TopicName.get(item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                    resetOffsets(tp);
                }

                K key = getKey(topic, msg);
                if (valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
                }
                V value = valueSchema.decode(msg.getData());

                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, key, value);

                records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);

                // Update last offset seen by application
                lastReceivedOffset.put(tp, offset);
                unpolledPartitions.remove(tp);

                if (++numberOfRecords >= maxRecordsInSinglePoll) {
                    break;
                }

                // Check if we have an item already available
                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
            }

            if (isAutoCommit && !records.isEmpty()) {
                // Commit the offset of previously dequeued messages
                commitAsync();
            }

            return new ConsumerRecords<>(records);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }