in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [199:217]
public OffsetCommitResponse commitOffsets(PulsarOffsetCommitRequest request) {
PulsarOffsetCommitResponse response = new PulsarOffsetCommitResponse(null);
for (Entry<String, MessageId> topicOffset : request.getTopicOffsetMap().entrySet()) {
final String topic = topicOffset.getKey();
final String groupId = request.getGroupId();
try {
Consumer<byte[]> consumer = getConsumer(topic, groupId);
consumer.acknowledgeCumulative(topicOffset.getValue());
} catch (Exception e) {
log.warn("Failed to ack message for topic {}-{}", topic, topicOffset.getValue(), e);
response.hasError = true;
TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
response.errors.computeIfAbsent(topicPartition, tp -> ErrorMapping.UnknownCode());
}
}
return response;
}