in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java [116:168]
public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
if (!COMPATIBLE_VERSIONS.contains(version)) {
throw new VersionMismatchException(
"Trying to deserialize KinesisShardSplit serialized with unsupported version "
+ version
+ ". Version of serializer is "
+ getVersion());
}
final String streamArn = in.readUTF();
final String shardId = in.readUTF();
final ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(in.readUTF());
Object startingMarker = null;
final boolean hasStartingMarker = in.readBoolean();
if (hasStartingMarker) {
if (in.readBoolean()) {
startingMarker = Instant.ofEpochMilli(in.readLong());
}
if (in.readBoolean()) {
startingMarker = in.readUTF();
}
}
Set<String> parentShardIds = new HashSet<>();
if (version == CURRENT_VERSION) {
int parentShardCount = in.readInt();
for (int i = 0; i < parentShardCount; i++) {
parentShardIds.add(in.readUTF());
}
}
String startingHashKey;
String endingHashKey;
if (version == CURRENT_VERSION) {
startingHashKey = in.readUTF();
endingHashKey = in.readUTF();
} else {
startingHashKey = "-1";
endingHashKey = "0";
}
return new KinesisShardSplit(
streamArn,
shardId,
new StartingPosition(shardIteratorType, startingMarker),
parentShardIds,
startingHashKey,
endingHashKey);
}
}