in raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java [75:227]
public CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
LeaderState<?> leaderState,
ListenerName requestListenerName,
ReplicaKey voterKey,
Endpoints voterEndpoints,
UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions,
long currentTimeMs
) {
// Check if there are any pending voter change requests
if (leaderState.isOperationPending(currentTimeMs)) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Read the voter set from the log or leader state
KRaftVersion kraftVersion = partitionState.lastKraftVersion();
final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters;
final Optional<VoterSet> voters;
if (kraftVersion.isReconfigSupported()) {
inMemoryVoters = Optional.empty();
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
voters = Optional.empty();
} else {
voters = votersEntry.map(LogHistory.Entry::value);
}
} else {
inMemoryVoters = leaderState.volatileVoters();
if (inMemoryVoters.isEmpty()) {
/* This can happen if the remote voter sends an update voter request before the
* updated kraft version has been written to the log
*/
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters);
}
if (voters.isEmpty()) {
log.info("Unable to read the current voter set with kraft version {}", kraftVersion);
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that the supported version range is valid
if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Check that endpoints includes the default listener
if (voterEndpoints.address(defaultListenerName).isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
// Update the voter
Optional<VoterSet> updatedVoters = updateVoters(
voters.get(),
kraftVersion,
VoterSet.VoterNode.of(
voterKey,
voterEndpoints,
new SupportedVersionRange(
supportedKraftVersions.minSupportedVersion(),
supportedKraftVersions.maxSupportedVersion()
)
)
);
if (updatedVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.VOTER_NOT_FOUND,
requestListenerName,
new LeaderAndEpoch(
localId,
leaderState.epoch()
),
leaderState.leaderEndpoints()
)
);
}
return storeUpdatedVoters(
leaderState,
voterKey,
inMemoryVoters,
updatedVoters.get(),
requestListenerName,
currentTimeMs
);
}