public KinesisStreamsSourceEnumeratorState deserialize()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java [83:131]


    public KinesisStreamsSourceEnumeratorState deserialize(
            int version, byte[] serializedEnumeratorState) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
                DataInputStream in = new DataInputStream(bais)) {

            if (version != getVersion()) {
                throw new VersionMismatchException(
                        "Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version "
                                + version
                                + ". Serializer version is "
                                + getVersion());
            }

            String lastSeenShardId = null;

            final boolean hasLastSeenShardId = in.readBoolean();
            if (hasLastSeenShardId) {
                lastSeenShardId = in.readUTF();
            }

            final int numUnassignedSplits = in.readInt();
            final int splitSerializerVersion = in.readInt();
            if (splitSerializerVersion != splitSerializer.getVersion()) {
                throw new VersionMismatchException(
                        "Trying to deserialize KinesisShardSplit serialized with unsupported version "
                                + splitSerializerVersion
                                + ". Serializer version is "
                                + splitSerializer.getVersion());
            }
            Set<KinesisShardSplit> unassignedSplits = new HashSet<>(numUnassignedSplits);
            for (int i = 0; i < numUnassignedSplits; i++) {
                int serializedLength = in.readInt();
                byte[] serializedSplit = new byte[serializedLength];
                if (in.read(serializedSplit) != -1) {
                    unassignedSplits.add(
                            splitSerializer.deserialize(splitSerializerVersion, serializedSplit));
                } else {
                    throw new IOException(
                            "Unexpectedly reading more bytes than is present in stream.");
                }
            }

            if (in.available() > 0) {
                throw new IOException("Unexpected trailing bytes when deserializing.");
            }

            return new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId);
        }
    }