public ControllerResult registerBroker()

in metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java [337:463]


    public ControllerResult<BrokerRegistrationReply> registerBroker(
        BrokerRegistrationRequestData request,
        long newBrokerEpoch,
        FinalizedControllerFeatures finalizedFeatures,
        boolean cleanShutdownDetectionEnabled
    ) {
        if (heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        if (!clusterId.equals(request.clusterId())) {
            throw new InconsistentClusterIdException("Expected cluster ID " + clusterId +
                ", but got cluster ID " + request.clusterId());
        }
        int brokerId = request.brokerId();
        List<ApiMessageAndVersion> records = new ArrayList<>();
        BrokerRegistration existing = brokerRegistrations.get(brokerId);
        Uuid prevIncarnationId = null;
        long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1
        if (existing != null) {
            prevIncarnationId = existing.incarnationId();
            storedBrokerEpoch = existing.epoch();
            if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
                if (!request.incarnationId().equals(prevIncarnationId)) {
                    throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
                            "was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");
                }
            }
        }

        if (request.isMigratingZkBroker()) {
            throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
        }

        if (featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
            if (request.logDirs().isEmpty()) {
                throw new InvalidRegistrationException("No directories specified in request");
            }
            if (request.logDirs().stream().anyMatch(DirectoryId::reserved)) {
                throw new InvalidRegistrationException("Reserved directory ID in request");
            }
            Set<Uuid> set = new HashSet<>(request.logDirs());
            if (set.size() != request.logDirs().size()) {
                throw new InvalidRegistrationException("Duplicate directory ID in request");
            }
            for (Uuid directory : request.logDirs()) {
                Integer dirBrokerId = directoryToBroker.get(directory);
                if (dirBrokerId != null && dirBrokerId != brokerId) {
                    throw new InvalidRegistrationException("Broker " + dirBrokerId +
                            " is already registered with directory " + directory);
                }
            }
        }

        ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRequest(request.listeners());
        RegisterBrokerRecord record = new RegisterBrokerRecord().
            setBrokerId(brokerId).
            setIsMigratingZkBroker(request.isMigratingZkBroker()).
            setIncarnationId(request.incarnationId()).
            setRack(request.rack()).
            setEndPoints(listenerInfo.toBrokerRegistrationRecord());

        // Track which finalized features we have not yet verified are supported by the broker.
        Map<String, Short> unverifiedFeatures = new HashMap<>(finalizedFeatures.featureMap());

        // Check every broker feature version range includes the finalized version.
        for (BrokerRegistrationRequestData.Feature feature : request.features()) {
            record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
            unverifiedFeatures.remove(feature.name());
        }

        if (request.features().find(MetadataVersion.FEATURE_NAME) == null)
            throw new InvalidRegistrationException("Request features do not contain '" + MetadataVersion.FEATURE_NAME + "'");

        // We also need to check every controller feature is supported by the broker.
        unverifiedFeatures.forEach((featureName, finalizedVersion) -> {
            if (finalizedVersion != 0 && request.features().findAll(featureName).isEmpty()) {
                processRegistrationFeature(brokerId, finalizedFeatures,
                    new BrokerRegistrationRequestData.Feature().
                        setName(featureName).
                        setMinSupportedVersion((short) 0).
                        setMaxSupportedVersion((short) 0));
            }
        });
        if (featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
            record.setLogDirs(request.logDirs());
        }

        if (!request.incarnationId().equals(prevIncarnationId)) {
            int prevNumRecords = records.size();
            boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
                storedBrokerEpoch == request.previousBrokerEpoch() : false;
            brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
            int numRecordsAdded = records.size() - prevNumRecords;
            if (existing == null) {
                log.info("No previous registration found for broker {}. New incarnation ID is " +
                        "{}.  Generated {} record(s) to clean up previous incarnations. New broker " +
                        "epoch is {}.", brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch);
            } else {
                log.info("Registering a new incarnation of broker {}. Previous incarnation ID " +
                        "was {}; new incarnation ID is {}. Generated {} record(s) to clean up " +
                        "previous incarnations. Broker epoch will become {}.", brokerId,
                        existing.incarnationId(), request.incarnationId(), numRecordsAdded,
                        newBrokerEpoch);
            }
            record.setBrokerEpoch(newBrokerEpoch);
        } else {
            log.info("Amending registration of broker {}, incarnation ID {}. Broker epoch remains {}.",
                    request.brokerId(), request.incarnationId(), existing.epoch());
            record.setFenced(existing.fenced());
            record.setInControlledShutdown(existing.inControlledShutdown());
            record.setBrokerEpoch(existing.epoch());
        }
        records.add(new ApiMessageAndVersion(record, featureControl.metadataVersionOrThrow().
            registerBrokerRecordVersion()));

        if (!request.incarnationId().equals(prevIncarnationId)) {
            // Remove any existing session for the old broker incarnation.
            heartbeatManager.remove(brokerId);
        }
        heartbeatManager.register(brokerId, record.fenced());

        // A broker registration that cleans up a previous incarnation's unclean shutdown may generate a large number of records.
        // It is safe to return these records as a non-atomic batch as long as the registration record is added last.
        // This ensures that in case of a controller failure, the broker will re-register and the new controller
        // can retry the unclean shutdown cleanup.
        return ControllerResult.of(records, new BrokerRegistrationReply(record.brokerEpoch()));
    }