in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java [73:96]
public PulsarSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
// VERSION 2 deserialization, support VERSION 0 and 1 deserialization in the meantime.
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
Set<TopicPartition> partitions = null;
if (version == 3) {
partitions = deserializeSet(in, deserializePartition(2));
} else if (version == 2) {
partitions = deserializeSet(in, deserializePartition(1));
} else {
partitions = deserializeSet(in, deserializePartition(0));
}
// Only deserialize these fields for backward compatibility.
if (version == 0) {
deserializeSet(in, deserializeSplit());
deserializeMap(in, DataInput::readInt, i -> deserializeSet(i, deserializeSplit()));
deserializeMap(in, DataInput::readInt, i -> deserializeSet(i, DataInput::readUTF));
in.readBoolean();
}
return new PulsarSourceEnumState(partitions);
}
}