in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java [117:168]
public KinesisStreamsSourceEnumeratorState deserialize(
int version, byte[] serializedEnumeratorState) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
DataInputStream in = new DataInputStream(bais)) {
if (!COMPATIBLE_VERSIONS.contains(version)) {
throw new VersionMismatchException(
"Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version "
+ version
+ ". Serializer version is "
+ getVersion());
}
String lastSeenShardId = null;
final boolean hasLastSeenShardId = in.readBoolean();
if (hasLastSeenShardId) {
lastSeenShardId = in.readUTF();
}
final int numUnassignedSplits = in.readInt();
final int splitSerializerVersion = in.readInt();
List<KinesisShardSplitWithAssignmentStatus> knownSplits =
new ArrayList<>(numUnassignedSplits);
for (int i = 0; i < numUnassignedSplits; i++) {
int serializedLength = in.readInt();
byte[] serializedSplit = new byte[serializedLength];
if (in.read(serializedSplit) != -1) {
KinesisShardSplit deserializedSplit =
splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
SplitAssignmentStatus assignmentStatus = SplitAssignmentStatus.UNASSIGNED;
if (version == CURRENT_VERSION) {
assignmentStatus = SplitAssignmentStatus.fromStatusCode(in.readInt());
}
knownSplits.add(
new KinesisShardSplitWithAssignmentStatus(
deserializedSplit, assignmentStatus));
} else {
throw new IOException(
"Unexpectedly reading more bytes than is present in stream.");
}
}
if (in.available() > 0) {
throw new IOException("Unexpected trailing bytes when deserializing.");
}
return new KinesisStreamsSourceEnumeratorState(knownSplits, lastSeenShardId);
}
}