pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [104:124]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }

    // Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
    // topics/partitions into a single queues
    private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);

    public PulsarKafkaConsumer(Map<String, Object> configs) {
        this(new ConsumerConfig(configs), null, null);
    }

    public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
                               Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(configs),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [95:115]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }

    // Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
    // topics/partitions into a single queues
    private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);

    public PulsarKafkaConsumer(Map<String, Object> configs) {
        this(new ConsumerConfig(configs), null, null);
    }

    public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
                               Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(configs),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



