in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java [138:155]
public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId msgId, Throwable exception) {
try {
MessageMetadata messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
partitionID = getPartitionID(messageMetadataBuilder);
TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
-1L,
-1L,
messageMetadataBuilder.getEventTime(),
-1L,
message.getKeyBytes().length,
message.getValue().length), new Exception(exception));
} catch (NumberFormatException e) {
String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage();
log.error(errorMessage);
throw new RuntimeException(errorMessage);
}
}