public void addPostCommitTopology()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java [171:184]


    public void addPostCommitTopology(DataStream<CommittableMessage<KafkaCommittable>> committer) {
        // this is a somewhat hacky way to ensure that the committer and writer are co-located
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE && transactionalIdPrefix != null) {
            Transformation<?> transformation = committer.getTransformation();
            // all sink transformations output CommittableMessage, so we can safely traverse the
            // chain; custom colocation key is set before and should be preserved
            while (transformation.getOutputType() instanceof CommittableMessageTypeInfo
                    && transformation.getCoLocationGroupKey() == null) {
                // colocate by transactionalIdPrefix, which should be unique
                transformation.setCoLocationGroupKey(transactionalIdPrefix);
                transformation = transformation.getInputs().get(0);
            }
        }
    }