public int configuredMetadataVersion()

in streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java [155:209]


    public int configuredMetadataVersion(final int priorVersion) {
        final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
        if (upgradeFrom != null) {
            switch (UpgradeFromValues.fromString(upgradeFrom)) {
                case UPGRADE_FROM_0100:
                    log.info(
                        "Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.",
                        LATEST_SUPPORTED_VERSION
                    );
                    return 1;
                case UPGRADE_FROM_0101:
                case UPGRADE_FROM_0102:
                case UPGRADE_FROM_0110:
                case UPGRADE_FROM_10:
                case UPGRADE_FROM_11:
                    log.info(
                        "Downgrading metadata.version from {} to 2 for upgrade from {}.x.",
                        LATEST_SUPPORTED_VERSION,
                        upgradeFrom
                    );
                    return 2;
                case UPGRADE_FROM_20:
                case UPGRADE_FROM_21:
                case UPGRADE_FROM_22:
                case UPGRADE_FROM_23:
                    // These configs are for cooperative rebalancing and should not affect the metadata version
                    break;
                case UPGRADE_FROM_24:
                case UPGRADE_FROM_25:
                case UPGRADE_FROM_26:
                case UPGRADE_FROM_27:
                case UPGRADE_FROM_28:
                case UPGRADE_FROM_30:
                case UPGRADE_FROM_31:
                case UPGRADE_FROM_32:
                case UPGRADE_FROM_33:
                case UPGRADE_FROM_34:
                case UPGRADE_FROM_35:
                case UPGRADE_FROM_36:
                case UPGRADE_FROM_37:
                case UPGRADE_FROM_38:
                case UPGRADE_FROM_39:
                    // we need to add new version when new "upgrade.from" values become available

                    // This config is for explicitly sending FK response to a requested partition
                    // and should not affect the metadata version
                    break;
                default:
                    throw new IllegalArgumentException(
                        "Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
                    );
            }
        }
        return priorVersion;
    }