public KafkaWriterState deserialize()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java [60:89]


    public KafkaWriterState deserialize(int version, byte[] serialized) throws IOException {
        if (version > 2) {
            throw new IOException("Unknown version: " + version);
        }

        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                final DataInputStream in = new DataInputStream(bais)) {
            final String transactionalIdPrefix = in.readUTF();
            int ownedSubtaskId = UNKNOWN;
            int totalNumberOfOwnedSubtasks = UNKNOWN;
            TransactionOwnership transactionOwnership = TransactionOwnership.IMPLICIT_BY_SUBTASK_ID;
            final Collection<CheckpointTransaction> precommitted = new ArrayList<>();
            if (version == 2) {
                ownedSubtaskId = in.readInt();
                totalNumberOfOwnedSubtasks = in.readInt();
                transactionOwnership = TransactionOwnership.values()[in.readInt()];

                final int usedTransactionIdsSize = in.readInt();
                for (int i = 0; i < usedTransactionIdsSize; i++) {
                    precommitted.add(new CheckpointTransaction(in.readUTF(), in.readLong()));
                }
            }
            return new KafkaWriterState(
                    transactionalIdPrefix,
                    ownedSubtaskId,
                    totalNumberOfOwnedSubtasks,
                    transactionOwnership,
                    precommitted);
        }
    }