in streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java [97:149]
public RebalanceProtocol rebalanceProtocol() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom);
log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
" Please be prepared to remove the 'upgrade.from' config soon.");
return RebalanceProtocol.EAGER;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available
// This config is for explicitly sending FK response to a requested partition
// and should not affect the rebalance protocol
break;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Cooperative rebalancing protocol is enabled now");
return RebalanceProtocol.COOPERATIVE;
}