public CompletableFuture handleUpdateVoterRequest()

in raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java [75:227]


    public CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
        LeaderState<?> leaderState,
        ListenerName requestListenerName,
        ReplicaKey voterKey,
        Endpoints voterEndpoints,
        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions,
        long currentTimeMs
    ) {
        // Check if there are any pending voter change requests
        if (leaderState.isOperationPending(currentTimeMs)) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that the leader has established a HWM and committed the current epoch
        Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
        if (highWatermark.isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Read the voter set from the log or leader state
        KRaftVersion kraftVersion = partitionState.lastKraftVersion();
        final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters;
        final Optional<VoterSet> voters;
        if (kraftVersion.isReconfigSupported()) {
            inMemoryVoters = Optional.empty();

            // Check that there are no uncommitted VotersRecord
            Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
            if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
                voters = Optional.empty();
            } else {
                voters = votersEntry.map(LogHistory.Entry::value);
            }
        } else {
            inMemoryVoters = leaderState.volatileVoters();
            if (inMemoryVoters.isEmpty()) {
                /* This can happen if the remote voter sends an update voter request before the
                 * updated kraft version has been written to the log
                 */
                return CompletableFuture.completedFuture(
                    RaftUtil.updateVoterResponse(
                        Errors.REQUEST_TIMED_OUT,
                        requestListenerName,
                        new LeaderAndEpoch(
                            localId,
                            leaderState.epoch()
                        ),
                        leaderState.leaderEndpoints()
                    )
                );
            }
            voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters);
        }
        if (voters.isEmpty()) {
            log.info("Unable to read the current voter set with kraft version {}", kraftVersion);
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }
        // Check that the supported version range is valid
        if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.INVALID_REQUEST,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that endpoints includes the default listener
        if (voterEndpoints.address(defaultListenerName).isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.INVALID_REQUEST,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Update the voter
        Optional<VoterSet> updatedVoters = updateVoters(
            voters.get(),
            kraftVersion,
            VoterSet.VoterNode.of(
                voterKey,
                voterEndpoints,
                new SupportedVersionRange(
                    supportedKraftVersions.minSupportedVersion(),
                    supportedKraftVersions.maxSupportedVersion()
                )
            )
        );
        if (updatedVoters.isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.VOTER_NOT_FOUND,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        return storeUpdatedVoters(
            leaderState,
            voterKey,
            inMemoryVoters,
            updatedVoters.get(),
            requestListenerName,
            currentTimeMs
        );
    }