pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [411:427]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);

                // Update last offset seen by application
                lastReceivedOffset.put(tp, offset);
                unpolledPartitions.remove(tp);

                if (++numberOfRecords >= maxRecordsInSinglePoll) {
                    break;
                }

                // Check if we have an item already available
                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
            }

            if (isAutoCommit && !records.isEmpty()) {
                // Commit the offset of previously dequeued messages
                commitAsync();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [333:349]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);

                // Update last offset seen by application
                lastReceivedOffset.put(tp, offset);
                unpolledPartitions.remove(tp);

                if (++numberOfRecords >= maxRecordsInSinglePoll) {
                    break;
                }

                // Check if we have an item already available
                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
            }

            if (isAutoCommit && !records.isEmpty()) {
                // Commit the offset of previously dequeued messages
                commitAsync();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



