public DynamoDbStreamsSourceEnumeratorState deserialize()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java [79:121]


    public DynamoDbStreamsSourceEnumeratorState deserialize(
            int version, byte[] serializedEnumeratorState) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
                DataInputStream in = new DataInputStream(bais)) {

            if (version != getVersion()) {
                throw new VersionMismatchException(
                        "Trying to deserialize DynamoDbStreamsSourceEnumeratorState serialized with unsupported version "
                                + version
                                + ". Serializer version is "
                                + getVersion());
            }
            final int numKnownSplits = in.readInt();
            final int splitSerializerVersion = in.readInt();

            List<DynamoDBStreamsShardSplitWithAssignmentStatus> knownSplits =
                    new ArrayList<>(numKnownSplits);

            for (int i = 0; i < numKnownSplits; i++) {
                int serializedLength = in.readInt();
                byte[] serializedSplit = new byte[serializedLength];
                if (in.read(serializedSplit) != -1) {
                    DynamoDbStreamsShardSplit deserializedSplit =
                            splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
                    SplitAssignmentStatus assignmentStatus =
                            SplitAssignmentStatus.fromStatusCode(in.readInt());
                    knownSplits.add(
                            new DynamoDBStreamsShardSplitWithAssignmentStatus(
                                    deserializedSplit, assignmentStatus));
                } else {
                    throw new IOException(
                            "Unexpectedly reading more bytes than is present in stream.");
                }
            }

            Instant startTimestamp = Instant.ofEpochMilli(in.readLong());
            if (in.available() > 0) {
                throw new IOException("Unexpected trailing bytes when deserializing.");
            }

            return new DynamoDbStreamsSourceEnumeratorState(knownSplits, startTimestamp);
        }
    }