in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java [76:105]
public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
if (version != getVersion()) {
throw new VersionMismatchException(
"Trying to deserialize KinesisShardSplit serialized with unsupported version "
+ version
+ ". Version of serializer is "
+ getVersion());
}
final String streamArn = in.readUTF();
final String shardId = in.readUTF();
final ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(in.readUTF());
Object startingMarker = null;
final boolean hasStartingMarker = in.readBoolean();
if (hasStartingMarker) {
if (in.readBoolean()) {
startingMarker = Instant.ofEpochMilli(in.readLong());
}
if (in.readBoolean()) {
startingMarker = in.readUTF();
}
}
return new KinesisShardSplit(
streamArn, shardId, new StartingPosition(shardIteratorType, startingMarker));
}
}