in raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java [75:219]
public CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
LeaderState<?> leaderState,
ListenerName requestListenerName,
ReplicaKey voterKey,
Endpoints voterEndpoints,
UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions,
long currentTimeMs
) {
// Check if there are any pending voter change requests
if (leaderState.isOperationPending(currentTimeMs)) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// KAFKA-16538 will implement the case when the kraft.version is 0
// Check that the cluster supports kraft.version >= 1
KRaftVersion kraftVersion = partitionState.lastKraftVersion();
if (!kraftVersion.isReconfigSupported()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.UNSUPPORTED_VERSION,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that the supported version range is valid
if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that endpoinds includes the default listener
if (voterEndpoints.address(defaultListenerName).isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Update the voter
Optional<VoterSet> updatedVoters = votersEntry
.get()
.value()
.updateVoter(
VoterSet.VoterNode.of(
voterKey,
voterEndpoints,
new SupportedVersionRange(
supportedKraftVersions.minSupportedVersion(),
supportedKraftVersions.maxSupportedVersion()
)
)
);
if (updatedVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.VOTER_NOT_FOUND,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs);
// Reply immediately and don't wait for the change to commit
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.NONE,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}