public KinesisStreamsSourceEnumeratorState deserialize()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java [117:168]


    public KinesisStreamsSourceEnumeratorState deserialize(
            int version, byte[] serializedEnumeratorState) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
                DataInputStream in = new DataInputStream(bais)) {

            if (!COMPATIBLE_VERSIONS.contains(version)) {
                throw new VersionMismatchException(
                        "Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version "
                                + version
                                + ". Serializer version is "
                                + getVersion());
            }

            String lastSeenShardId = null;

            final boolean hasLastSeenShardId = in.readBoolean();
            if (hasLastSeenShardId) {
                lastSeenShardId = in.readUTF();
            }

            final int numUnassignedSplits = in.readInt();
            final int splitSerializerVersion = in.readInt();

            List<KinesisShardSplitWithAssignmentStatus> knownSplits =
                    new ArrayList<>(numUnassignedSplits);
            for (int i = 0; i < numUnassignedSplits; i++) {
                int serializedLength = in.readInt();
                byte[] serializedSplit = new byte[serializedLength];
                if (in.read(serializedSplit) != -1) {
                    KinesisShardSplit deserializedSplit =
                            splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
                    SplitAssignmentStatus assignmentStatus = SplitAssignmentStatus.UNASSIGNED;
                    if (version == CURRENT_VERSION) {
                        assignmentStatus = SplitAssignmentStatus.fromStatusCode(in.readInt());
                    }
                    knownSplits.add(
                            new KinesisShardSplitWithAssignmentStatus(
                                    deserializedSplit, assignmentStatus));

                } else {
                    throw new IOException(
                            "Unexpectedly reading more bytes than is present in stream.");
                }
            }

            if (in.available() > 0) {
                throw new IOException("Unexpected trailing bytes when deserializing.");
            }

            return new KinesisStreamsSourceEnumeratorState(knownSplits, lastSeenShardId);
        }
    }