private ApiError createTopic()

in metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java [706:851]


    private ApiError createTopic(ControllerRequestContext context,
                                 CreatableTopic topic,
                                 List<ApiMessageAndVersion> records,
                                 Map<String, CreatableTopicResult> successes,
                                 List<ApiMessageAndVersion> configRecords,
                                 boolean authorizedToReturnConfigs) {
        Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
        Map<Integer, PartitionRegistration> newParts = new HashMap<>();
        if (!topic.assignments().isEmpty()) {
            if (topic.replicationFactor() != -1) {
                return new ApiError(INVALID_REQUEST,
                    "A manual partition assignment was specified, but replication " +
                    "factor was not set to -1.");
            }
            if (topic.numPartitions() != -1) {
                return new ApiError(INVALID_REQUEST,
                    "A manual partition assignment was specified, but numPartitions " +
                        "was not set to -1.");
            }
            OptionalInt replicationFactor = OptionalInt.empty();
            for (CreatableReplicaAssignment assignment : topic.assignments()) {
                if (newParts.containsKey(assignment.partitionIndex())) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                        "Found multiple manual partition assignments for partition " +
                            assignment.partitionIndex());
                }
                PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
                validateManualPartitionAssignment(partitionAssignment, replicationFactor);
                replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                List<Integer> isr = assignment.brokerIds().stream().
                    filter(clusterControl::isActive).toList();
                if (isr.isEmpty()) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                        "All brokers specified in the manual partition assignment for " +
                        "partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
                }
                newParts.put(
                    assignment.partitionIndex(),
                    buildPartitionRegistration(partitionAssignment, isr)
                );
            }
            for (int i = 0; i < newParts.size(); i++) {
                if (!newParts.containsKey(i)) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                            "partitions should be a consecutive 0-based integer sequence");
                }
            }
            ApiError error = maybeCheckCreateTopicPolicy(() -> {
                Map<Integer, List<Integer>> assignments = new HashMap<>();
                newParts.forEach((key, value) -> assignments.put(key, Replicas.toList(value.replicas)));
                return new CreateTopicPolicy.RequestMetadata(
                    topic.name(), null, null, assignments, creationConfigs);
            });
            if (error.isFailure()) return error;
        } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
            return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                "Replication factor must be larger than 0, or -1 to use the default value.");
        } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
            return new ApiError(Errors.INVALID_PARTITIONS,
                "Number of partitions was set to an invalid non-positive value.");
        } else {
            int numPartitions = topic.numPartitions() == -1 ?
                defaultNumPartitions : topic.numPartitions();
            short replicationFactor = topic.replicationFactor() == -1 ?
                defaultReplicationFactor : topic.replicationFactor();
            try {
                TopicAssignment topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
                    0,
                    numPartitions,
                    replicationFactor
                ), clusterDescriber);
                for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
                    PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
                    List<Integer> isr = partitionAssignment.replicas().stream().
                        filter(clusterControl::isActive).toList();
                    // If the ISR is empty, it means that all brokers are fenced or
                    // in controlled shutdown. To be consistent with the replica placer,
                    // we reject the create topic request with INVALID_REPLICATION_FACTOR.
                    if (isr.isEmpty()) {
                        return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                            "Unable to replicate the partition " + replicationFactor +
                                " time(s): All brokers are currently fenced or in controlled shutdown.");
                    }
                    newParts.put(
                        partitionId,
                        buildPartitionRegistration(partitionAssignment, isr)
                    );
                }
            } catch (InvalidReplicationFactorException e) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                    "Unable to replicate the partition " + replicationFactor +
                        " time(s): " + e.getMessage());
            }
            ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(
                topic.name(), numPartitions, replicationFactor, null, creationConfigs));
            if (error.isFailure()) return error;
        }
        int numPartitions = newParts.size();
        try {
            context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota
        } catch (ThrottlingQuotaExceededException e) {
            log.debug("Topic creation of {} partitions not allowed because quota is violated. Delay time: {}",
                numPartitions, e.throttleTimeMs());
            return ApiError.fromThrowable(e);
        }
        Uuid topicId = Uuid.randomUuid();
        CreatableTopicResult result = new CreatableTopicResult().
            setName(topic.name()).
            setTopicId(topicId).
            setErrorCode(NONE.code()).
            setErrorMessage(null);
        if (authorizedToReturnConfigs) {
            Map<String, ConfigEntry> effectiveConfig = configurationControl.
                computeEffectiveTopicConfigs(creationConfigs);
            List<String> configNames = new ArrayList<>(effectiveConfig.keySet());
            configNames.sort(String::compareTo);
            for (String configName : configNames) {
                ConfigEntry entry = effectiveConfig.get(configName);
                result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().
                    setName(entry.name()).
                    setValue(entry.isSensitive() ? null : entry.value()).
                    setReadOnly(entry.isReadOnly()).
                    setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
                    setIsSensitive(entry.isSensitive()));
            }
            result.setNumPartitions(numPartitions);
            result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
            result.setTopicConfigErrorCode(NONE.code());
        } else {
            result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
        }
        successes.put(topic.name(), result);
        records.add(new ApiMessageAndVersion(new TopicRecord().
            setName(topic.name()).
            setTopicId(topicId), (short) 0));
        // ConfigRecords go after TopicRecord but before PartitionRecord(s).
        records.addAll(configRecords);
        for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
            int partitionIndex = partEntry.getKey();
            PartitionRegistration info = partEntry.getValue();
            records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
                setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
                build()));
        }
        return ApiError.NONE;
    }