public PulsarMessageAndMetadata next()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java [80:127]


    public PulsarMessageAndMetadata<K, V> next() {

        Message<byte[]> msg = receivedMessages.poll();
        if (msg == null) {
            try {
                msg = consumer.receive();
            } catch (PulsarClientException e) {
                log.warn("Failed to receive message for {}-{}, {}", consumer.getTopic(), consumer.getSubscription(),
                        e.getMessage(), e);
                throw new RuntimeException(
                        "failed to receive message from " + consumer.getTopic() + "-" + consumer.getSubscription());
            }
        }

        int partition = TopicName.getPartitionIndex(consumer.getTopic());
        long offset = MessageIdUtils.getOffset(msg.getMessageId());
        String key = msg.getKey();
        byte[] value = msg.getValue();

        K desKey = null;
        V desValue = null;

        if (StringUtils.isNotBlank(key)) {
            if (keyDeSerializer.isPresent() && keyDeSerializer.get() instanceof StringDecoder) {
                desKey = (K) key;
            } else {
                byte[] decodedBytes = Base64.getDecoder().decode(key);
                desKey = keyDeSerializer.isPresent() ? keyDeSerializer.get().fromBytes(decodedBytes)
                        : (K) DEFAULT_DECODER.fromBytes(decodedBytes);
            }
        }

        if (value != null) {
            desValue = valueDeSerializer.isPresent() ? valueDeSerializer.get().fromBytes(msg.getData())
                    : (V) DEFAULT_DECODER.fromBytes(msg.getData());
        }

        PulsarMessageAndMetadata<K, V> msgAndMetadata = new PulsarMessageAndMetadata<>(consumer.getTopic(), partition,
                null, offset, keyDeSerializer.orElse(null), valueDeSerializer.orElse(null), desKey, desValue);

        if (isAutoCommit) {
            // Commit the offset of previously dequeued messages
            consumer.acknowledgeCumulativeAsync(msg);
        }

        lastConsumedMessageId = msg.getMessageId();
        return msgAndMetadata;
    }