in flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java [77:102]
public CassandraEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
try (final ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(serialized);
final ObjectInputStream objectInputStream =
new ObjectInputStream(byteArrayInputStream)) {
final Queue<CassandraSplit> splitsToReassign = new ArrayDeque<>();
final int splitsToReassignSize = objectInputStream.readInt();
for (int i = 0; i < splitsToReassignSize; i++) {
final int splitSize = objectInputStream.readInt();
final byte[] splitBytes = new byte[splitSize];
objectInputStream.readFully(splitBytes);
final CassandraSplit split =
CassandraSplitSerializer.INSTANCE.deserialize(
CassandraSplitSerializer.CURRENT_VERSION, splitBytes);
splitsToReassign.add(split);
}
final long numSplitsLeftToGenerate = objectInputStream.readLong();
final BigInteger increment = BigIntegerSerializationUtils.read(objectInputStream);
final BigInteger startToken = BigIntegerSerializationUtils.read(objectInputStream);
final BigInteger maxToken = BigIntegerSerializationUtils.read(objectInputStream);
return new CassandraEnumeratorState(
numSplitsLeftToGenerate, increment, startToken, maxToken, splitsToReassign);
}
}