public CompletableFuture handleUpdateVoterRequest()

in raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java [75:219]


    public CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
        LeaderState<?> leaderState,
        ListenerName requestListenerName,
        ReplicaKey voterKey,
        Endpoints voterEndpoints,
        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions,
        long currentTimeMs
    ) {
        // Check if there are any pending voter change requests
        if (leaderState.isOperationPending(currentTimeMs)) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that the leader has established a HWM and committed the current epoch
        Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
        if (highWatermark.isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // KAFKA-16538 will implement the case when the kraft.version is 0
        // Check that the cluster supports kraft.version >= 1
        KRaftVersion kraftVersion = partitionState.lastKraftVersion();
        if (!kraftVersion.isReconfigSupported()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.UNSUPPORTED_VERSION,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that there are no uncommitted VotersRecord
        Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
        if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.REQUEST_TIMED_OUT,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that the supported version range is valid
        if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.INVALID_REQUEST,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Check that endpoinds includes the default listener
        if (voterEndpoints.address(defaultListenerName).isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.INVALID_REQUEST,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        // Update the voter
        Optional<VoterSet> updatedVoters = votersEntry
            .get()
            .value()
            .updateVoter(
                VoterSet.VoterNode.of(
                    voterKey,
                    voterEndpoints,
                    new SupportedVersionRange(
                        supportedKraftVersions.minSupportedVersion(),
                        supportedKraftVersions.maxSupportedVersion()
                    )
                )
            );
        if (updatedVoters.isEmpty()) {
            return CompletableFuture.completedFuture(
                RaftUtil.updateVoterResponse(
                    Errors.VOTER_NOT_FOUND,
                    requestListenerName,
                    new LeaderAndEpoch(
                        localId,
                        leaderState.epoch()
                    ),
                    leaderState.leaderEndpoints()
                )
            );
        }

        leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs);

        // Reply immediately and don't wait for the change to commit
        return CompletableFuture.completedFuture(
            RaftUtil.updateVoterResponse(
                Errors.NONE,
                requestListenerName,
                new LeaderAndEpoch(
                    localId,
                    leaderState.epoch()
                ),
                leaderState.leaderEndpoints()
            )
        );
    }