in streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java [155:209]
public int configuredMetadataVersion(final int priorVersion) {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
log.info(
"Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.",
LATEST_SUPPORTED_VERSION
);
return 1;
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
log.info(
"Downgrading metadata.version from {} to 2 for upgrade from {}.x.",
LATEST_SUPPORTED_VERSION,
upgradeFrom
);
return 2;
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// These configs are for cooperative rebalancing and should not affect the metadata version
break;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available
// This config is for explicitly sending FK response to a requested partition
// and should not affect the metadata version
break;
default:
throw new IllegalArgumentException(
"Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
);
}
}
return priorVersion;
}