in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java [171:184]
public void addPostCommitTopology(DataStream<CommittableMessage<KafkaCommittable>> committer) {
// this is a somewhat hacky way to ensure that the committer and writer are co-located
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE && transactionalIdPrefix != null) {
Transformation<?> transformation = committer.getTransformation();
// all sink transformations output CommittableMessage, so we can safely traverse the
// chain; custom colocation key is set before and should be preserved
while (transformation.getOutputType() instanceof CommittableMessageTypeInfo
&& transformation.getCoLocationGroupKey() == null) {
// colocate by transactionalIdPrefix, which should be unique
transformation.setCoLocationGroupKey(transactionalIdPrefix);
transformation = transformation.getInputs().get(0);
}
}
}