public DynamicKafkaSourceEnumState deserialize()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java [92:122]


    public DynamicKafkaSourceEnumState deserialize(int version, byte[] serialized)
            throws IOException {
        if (version == VERSION_1) {
            try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                    DataInputStream in = new DataInputStream(bais)) {
                Set<KafkaStream> kafkaStreams = deserialize(in);

                Map<String, KafkaSourceEnumState> clusterEnumeratorStates = new HashMap<>();
                int kafkaSourceEnumStateSerializerVersion = in.readInt();

                int clusterEnumeratorStateMapSize = in.readInt();
                for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
                    String kafkaClusterId = in.readUTF();
                    int byteArraySize = in.readInt();
                    KafkaSourceEnumState kafkaSourceEnumState =
                            kafkaSourceEnumStateSerializer.deserialize(
                                    kafkaSourceEnumStateSerializerVersion,
                                    readNBytes(in, byteArraySize));
                    clusterEnumeratorStates.put(kafkaClusterId, kafkaSourceEnumState);
                }

                return new DynamicKafkaSourceEnumState(kafkaStreams, clusterEnumeratorStates);
            }
        }

        throw new IOException(
                String.format(
                        "The bytes are serialized with version %d, "
                                + "while this deserializer only supports version up to %d",
                        version, getVersion()));
    }