public RebalanceProtocol rebalanceProtocol()

in streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java [97:149]


    public RebalanceProtocol rebalanceProtocol() {
        final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
        if (upgradeFrom != null) {
            switch (UpgradeFromValues.fromString(upgradeFrom)) {
                case UPGRADE_FROM_0100:
                case UPGRADE_FROM_0101:
                case UPGRADE_FROM_0102:
                case UPGRADE_FROM_0110:
                case UPGRADE_FROM_10:
                case UPGRADE_FROM_11:
                case UPGRADE_FROM_20:
                case UPGRADE_FROM_21:
                case UPGRADE_FROM_22:
                case UPGRADE_FROM_23:
                    // ATTENTION: The following log messages is used for verification in system test
                    // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
                    // If you change it, please do also change the system test accordingly and
                    // verify whether the test passes.
                    log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom);
                    log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
                        " Please be prepared to remove the 'upgrade.from' config soon.");
                    return RebalanceProtocol.EAGER;
                case UPGRADE_FROM_24:
                case UPGRADE_FROM_25:
                case UPGRADE_FROM_26:
                case UPGRADE_FROM_27:
                case UPGRADE_FROM_28:
                case UPGRADE_FROM_30:
                case UPGRADE_FROM_31:
                case UPGRADE_FROM_32:
                case UPGRADE_FROM_33:
                case UPGRADE_FROM_34:
                case UPGRADE_FROM_35:
                case UPGRADE_FROM_36:
                case UPGRADE_FROM_37:
                case UPGRADE_FROM_38:
                case UPGRADE_FROM_39:
                    // we need to add new version when new "upgrade.from" values become available

                    // This config is for explicitly sending FK response to a requested partition
                    // and should not affect the rebalance protocol
                    break;
                default:
                    throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
            }
        }
        // ATTENTION: The following log messages is used for verification in system test
        // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
        // If you change it, please do also change the system test accordingly and
        // verify whether the test passes.
        log.info("Cooperative rebalancing protocol is enabled now");
        return RebalanceProtocol.COOPERATIVE;
    }