public void onSendAcknowledgement()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java [138:155]


    public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId msgId, Throwable exception) {
        try {
            MessageMetadata messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
            partitionID = getPartitionID(messageMetadataBuilder);
            TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
            kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
                                                                -1L,
                                                                -1L,
                                                                messageMetadataBuilder.getEventTime(),
                                                                -1L,
                                                                message.getKeyBytes().length,
                                                                message.getValue().length), new Exception(exception));
        } catch (NumberFormatException e) {
            String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage();
            log.error(errorMessage);
            throw new RuntimeException(errorMessage);
        }
    }