private void encodeActiveAndStandbyTaskAssignment()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java [71:94]


        private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out,
                                                          final List<TopicPartition> partitions) throws IOException {

            int lastId = 0;
            final Map<String, Integer> topicGroupIds = new HashMap<>();
            // encode active tasks
            // the number of assigned partitions must be the same as number of active tasks
            out.writeInt(partitions.size());
            for (TopicPartition p : partitions) {
                final int topicGroupId;
                if (topicGroupIds.containsKey(p.topic())) {
                    topicGroupId = topicGroupIds.get(p.topic());
                } else {
                    topicGroupId = lastId;
                    lastId++;
                    topicGroupIds.put(p.topic(), topicGroupId);
                }
                out.writeInt(topicGroupId);
                out.writeInt(p.partition());
            }

            // encode standby tasks
            out.writeInt(0);
        }