in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java [71:94]
private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out,
final List<TopicPartition> partitions) throws IOException {
int lastId = 0;
final Map<String, Integer> topicGroupIds = new HashMap<>();
// encode active tasks
// the number of assigned partitions must be the same as number of active tasks
out.writeInt(partitions.size());
for (TopicPartition p : partitions) {
final int topicGroupId;
if (topicGroupIds.containsKey(p.topic())) {
topicGroupId = topicGroupIds.get(p.topic());
} else {
topicGroupId = lastId;
lastId++;
topicGroupIds.put(p.topic(), topicGroupId);
}
out.writeInt(topicGroupId);
out.writeInt(p.partition());
}
// encode standby tasks
out.writeInt(0);
}