in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [231:262]
public PulsarOffsetFetchResponse fetchOffsets(PulsarOffsetFetchRequest request) {
final String groupId = request.groupId;
Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Maps.newHashMap();
PulsarOffsetFetchResponse response = new PulsarOffsetFetchResponse(responseMap);
for (TopicAndPartition topicMetadata : request.requestInfo) {
final String topicName = getTopicName(topicMetadata);
try {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName, false);
CursorStats cursor = stats.cursors != null ? stats.cursors.get(groupId) : null;
if (cursor != null) {
String readPosition = cursor.readPosition;
MessageId msgId = null;
if (readPosition != null && readPosition.contains(":")) {
try {
String[] position = readPosition.split(":");
msgId = new MessageIdImpl(Long.parseLong(position[0]), Long.parseLong(position[1]), -1);
} catch (Exception e) {
log.warn("Invalid read-position {} for {}-{}", readPosition, topicName, groupId);
}
}
msgId = msgId == null ? MessageId.earliest : msgId;
OffsetMetadataAndError oE = new OffsetMetadataAndError(MessageIdUtils.getOffset(msgId), null,
ErrorMapping.NoError());
responseMap.put(topicMetadata, oE);
}
} catch (Exception e) {
OffsetMetadataAndError oE = new OffsetMetadataAndError(0, null, ErrorMapping.UnknownCode());
responseMap.put(topicMetadata, oE);
}
}
return response;
}