public PulsarOffsetFetchResponse fetchOffsets()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [231:262]


    public PulsarOffsetFetchResponse fetchOffsets(PulsarOffsetFetchRequest request) {
        final String groupId = request.groupId;
        Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Maps.newHashMap();
        PulsarOffsetFetchResponse response = new PulsarOffsetFetchResponse(responseMap);
        for (TopicAndPartition topicMetadata : request.requestInfo) {
            final String topicName = getTopicName(topicMetadata);
            try {
                PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName, false);
                CursorStats cursor = stats.cursors != null ? stats.cursors.get(groupId) : null;
                if (cursor != null) {
                    String readPosition = cursor.readPosition;
                    MessageId msgId = null;
                    if (readPosition != null && readPosition.contains(":")) {
                        try {
                            String[] position = readPosition.split(":");
                            msgId = new MessageIdImpl(Long.parseLong(position[0]), Long.parseLong(position[1]), -1);
                        } catch (Exception e) {
                            log.warn("Invalid read-position {} for {}-{}", readPosition, topicName, groupId);
                        }
                    }
                    msgId = msgId == null ? MessageId.earliest : msgId;
                    OffsetMetadataAndError oE = new OffsetMetadataAndError(MessageIdUtils.getOffset(msgId), null,
                            ErrorMapping.NoError());
                    responseMap.put(topicMetadata, oE);
                }
            } catch (Exception e) {
                OffsetMetadataAndError oE = new OffsetMetadataAndError(0, null, ErrorMapping.UnknownCode());
                responseMap.put(topicMetadata, oE);
            }
        }
        return response;
    }