in metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java [337:463]
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long newBrokerEpoch,
FinalizedControllerFeatures finalizedFeatures,
boolean cleanShutdownDetectionEnabled
) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
if (!clusterId.equals(request.clusterId())) {
throw new InconsistentClusterIdException("Expected cluster ID " + clusterId +
", but got cluster ID " + request.clusterId());
}
int brokerId = request.brokerId();
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
Uuid prevIncarnationId = null;
long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
"was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");
}
}
}
if (request.isMigratingZkBroker()) {
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
}
if (featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
if (request.logDirs().isEmpty()) {
throw new InvalidRegistrationException("No directories specified in request");
}
if (request.logDirs().stream().anyMatch(DirectoryId::reserved)) {
throw new InvalidRegistrationException("Reserved directory ID in request");
}
Set<Uuid> set = new HashSet<>(request.logDirs());
if (set.size() != request.logDirs().size()) {
throw new InvalidRegistrationException("Duplicate directory ID in request");
}
for (Uuid directory : request.logDirs()) {
Integer dirBrokerId = directoryToBroker.get(directory);
if (dirBrokerId != null && dirBrokerId != brokerId) {
throw new InvalidRegistrationException("Broker " + dirBrokerId +
" is already registered with directory " + directory);
}
}
}
ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRequest(request.listeners());
RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setIncarnationId(request.incarnationId()).
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());
// Track which finalized features we have not yet verified are supported by the broker.
Map<String, Short> unverifiedFeatures = new HashMap<>(finalizedFeatures.featureMap());
// Check every broker feature version range includes the finalized version.
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
unverifiedFeatures.remove(feature.name());
}
if (request.features().find(MetadataVersion.FEATURE_NAME) == null)
throw new InvalidRegistrationException("Request features do not contain '" + MetadataVersion.FEATURE_NAME + "'");
// We also need to check every controller feature is supported by the broker.
unverifiedFeatures.forEach((featureName, finalizedVersion) -> {
if (finalizedVersion != 0 && request.features().findAll(featureName).isEmpty()) {
processRegistrationFeature(brokerId, finalizedFeatures,
new BrokerRegistrationRequestData.Feature().
setName(featureName).
setMinSupportedVersion((short) 0).
setMaxSupportedVersion((short) 0));
}
});
if (featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
record.setLogDirs(request.logDirs());
}
if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
storedBrokerEpoch == request.previousBrokerEpoch() : false;
brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
int numRecordsAdded = records.size() - prevNumRecords;
if (existing == null) {
log.info("No previous registration found for broker {}. New incarnation ID is " +
"{}. Generated {} record(s) to clean up previous incarnations. New broker " +
"epoch is {}.", brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch);
} else {
log.info("Registering a new incarnation of broker {}. Previous incarnation ID " +
"was {}; new incarnation ID is {}. Generated {} record(s) to clean up " +
"previous incarnations. Broker epoch will become {}.", brokerId,
existing.incarnationId(), request.incarnationId(), numRecordsAdded,
newBrokerEpoch);
}
record.setBrokerEpoch(newBrokerEpoch);
} else {
log.info("Amending registration of broker {}, incarnation ID {}. Broker epoch remains {}.",
request.brokerId(), request.incarnationId(), existing.epoch());
record.setFenced(existing.fenced());
record.setInControlledShutdown(existing.inControlledShutdown());
record.setBrokerEpoch(existing.epoch());
}
records.add(new ApiMessageAndVersion(record, featureControl.metadataVersionOrThrow().
registerBrokerRecordVersion()));
if (!request.incarnationId().equals(prevIncarnationId)) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
}
heartbeatManager.register(brokerId, record.fenced());
// A broker registration that cleans up a previous incarnation's unclean shutdown may generate a large number of records.
// It is safe to return these records as a non-atomic batch as long as the registration record is added last.
// This ensures that in case of a controller failure, the broker will re-register and the new controller
// can retry the unclean shutdown cleanup.
return ControllerResult.of(records, new BrokerRegistrationReply(record.brokerEpoch()));
}