in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java [60:89]
public KafkaWriterState deserialize(int version, byte[] serialized) throws IOException {
if (version > 2) {
throw new IOException("Unknown version: " + version);
}
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final String transactionalIdPrefix = in.readUTF();
int ownedSubtaskId = UNKNOWN;
int totalNumberOfOwnedSubtasks = UNKNOWN;
TransactionOwnership transactionOwnership = TransactionOwnership.IMPLICIT_BY_SUBTASK_ID;
final Collection<CheckpointTransaction> precommitted = new ArrayList<>();
if (version == 2) {
ownedSubtaskId = in.readInt();
totalNumberOfOwnedSubtasks = in.readInt();
transactionOwnership = TransactionOwnership.values()[in.readInt()];
final int usedTransactionIdsSize = in.readInt();
for (int i = 0; i < usedTransactionIdsSize; i++) {
precommitted.add(new CheckpointTransaction(in.readUTF(), in.readLong()));
}
}
return new KafkaWriterState(
transactionalIdPrefix,
ownedSubtaskId,
totalNumberOfOwnedSubtasks,
transactionOwnership,
precommitted);
}
}