in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java [92:122]
public DynamicKafkaSourceEnumState deserialize(int version, byte[] serialized)
throws IOException {
if (version == VERSION_1) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
Set<KafkaStream> kafkaStreams = deserialize(in);
Map<String, KafkaSourceEnumState> clusterEnumeratorStates = new HashMap<>();
int kafkaSourceEnumStateSerializerVersion = in.readInt();
int clusterEnumeratorStateMapSize = in.readInt();
for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
String kafkaClusterId = in.readUTF();
int byteArraySize = in.readInt();
KafkaSourceEnumState kafkaSourceEnumState =
kafkaSourceEnumStateSerializer.deserialize(
kafkaSourceEnumStateSerializerVersion,
readNBytes(in, byteArraySize));
clusterEnumeratorStates.put(kafkaClusterId, kafkaSourceEnumState);
}
return new DynamicKafkaSourceEnumState(kafkaStreams, clusterEnumeratorStates);
}
}
throw new IOException(
String.format(
"The bytes are serialized with version %d, "
+ "while this deserializer only supports version up to %d",
version, getVersion()));
}