in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java [92:114]
public JdbcSourceEnumeratorState deserialize(int version, byte[] serialized)
throws IOException {
if (version != CURRENT_VERSION) {
throw new IOException("Unknown version: " + version);
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
List<JdbcSourceSplit> completedSplits = deserializeSourceSplits(in);
List<JdbcSourceSplit> pendingSplits = deserializeSourceSplits(in);
List<JdbcSourceSplit> remainingSplits = deserializeSourceSplits(in);
int bytesLen = in.readInt();
byte[] bytes = new byte[bytesLen];
in.read(bytes);
return new JdbcSourceEnumeratorState(
completedSplits,
pendingSplits,
remainingSplits,
InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}