in metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java [706:851]
private ApiError createTopic(ControllerRequestContext context,
CreatableTopic topic,
List<ApiMessageAndVersion> records,
Map<String, CreatableTopicResult> successes,
List<ApiMessageAndVersion> configRecords,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but replication " +
"factor was not set to -1.");
}
if (topic.numPartitions() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
OptionalInt replicationFactor = OptionalInt.empty();
for (CreatableReplicaAssignment assignment : topic.assignments()) {
if (newParts.containsKey(assignment.partitionIndex())) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
validateManualPartitionAssignment(partitionAssignment, replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
filter(clusterControl::isActive).toList();
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition assignment for " +
"partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
}
newParts.put(
assignment.partitionIndex(),
buildPartitionRegistration(partitionAssignment, isr)
);
}
for (int i = 0; i < newParts.size(); i++) {
if (!newParts.containsKey(i)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"partitions should be a consecutive 0-based integer sequence");
}
}
ApiError error = maybeCheckCreateTopicPolicy(() -> {
Map<Integer, List<Integer>> assignments = new HashMap<>();
newParts.forEach((key, value) -> assignments.put(key, Replicas.toList(value.replicas)));
return new CreateTopicPolicy.RequestMetadata(
topic.name(), null, null, assignments, creationConfigs);
});
if (error.isFailure()) return error;
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor must be larger than 0, or -1 to use the default value.");
} else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
return new ApiError(Errors.INVALID_PARTITIONS,
"Number of partitions was set to an invalid non-positive value.");
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
try {
TopicAssignment topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
replicationFactor
), clusterDescriber);
for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).toList();
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
newParts.put(
partitionId,
buildPartitionRegistration(partitionAssignment, isr)
);
}
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" time(s): " + e.getMessage());
}
ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(
topic.name(), numPartitions, replicationFactor, null, creationConfigs));
if (error.isFailure()) return error;
}
int numPartitions = newParts.size();
try {
context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota
} catch (ThrottlingQuotaExceededException e) {
log.debug("Topic creation of {} partitions not allowed because quota is violated. Delay time: {}",
numPartitions, e.throttleTimeMs());
return ApiError.fromThrowable(e);
}
Uuid topicId = Uuid.randomUuid();
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
setErrorCode(NONE.code()).
setErrorMessage(null);
if (authorizedToReturnConfigs) {
Map<String, ConfigEntry> effectiveConfig = configurationControl.
computeEffectiveTopicConfigs(creationConfigs);
List<String> configNames = new ArrayList<>(effectiveConfig.keySet());
configNames.sort(String::compareTo);
for (String configName : configNames) {
ConfigEntry entry = effectiveConfig.get(configName);
result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().
setName(entry.name()).
setValue(entry.isSensitive() ? null : entry.value()).
setReadOnly(entry.isReadOnly()).
setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
setIsSensitive(entry.isSensitive()));
}
result.setNumPartitions(numPartitions);
result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
result.setTopicConfigErrorCode(NONE.code());
} else {
result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
}
successes.put(topic.name(), result);
records.add(new ApiMessageAndVersion(new TopicRecord().
setName(topic.name()).
setTopicId(topicId), (short) 0));
// ConfigRecords go after TopicRecord but before PartitionRecord(s).
records.addAll(configRecords);
for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build()));
}
return ApiError.NONE;
}