in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java [38:52]
public PulsarOffsetCommitRequest(String groupId, Map<TopicAndPartition, PulsarOffsetMetadataAndError> requestInfo,
short versionId, int correlationId, String clientId) {
super(groupId, Collections.emptyMap(), versionId, correlationId, clientId);
this.groupId = groupId;
for (Entry<TopicAndPartition, PulsarOffsetMetadataAndError> topicOffset : requestInfo.entrySet()) {
String topicName = PulsarKafkaSimpleConsumer.getTopicName(topicOffset.getKey());
OffsetMetadataAndError offsetMetadata = topicOffset.getValue();
MessageId msgId = null;
if (offsetMetadata instanceof PulsarOffsetMetadataAndError) {
msgId = ((PulsarOffsetMetadataAndError) offsetMetadata).getMessageId();
}
msgId = msgId == null ? MessageIdUtils.getMessageId(topicOffset.getValue().offset()) : msgId;
topicOffsetMap.put(topicName, msgId);
}
}