in core/src/main/scala/kafka/admin/TopicCommand.scala [678:761]
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
def valuesAsOption[A](option: OptionSpec[A], defaultValue: Option[util.List[A]] = None): Option[util.List[A]] = if (has(option)) Some(options.valuesOf(option)) else defaultValue
def hasCreateOption: Boolean = has(createOpt)
def hasAlterOption: Boolean = has(alterOpt)
def hasListOption: Boolean = has(listOpt)
def hasDescribeOption: Boolean = has(describeOpt)
def hasDeleteOption: Boolean = has(deleteOpt)
def zkConnect: Option[String] = valueAsOption(zkConnectOpt)
def bootstrapServer: Option[String] = valueAsOption(bootstrapServerOpt)
def commandConfig: Properties = if (has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties()
def topic: Option[String] = valueAsOption(topicOpt)
def partitions: Option[Integer] = valueAsOption(partitionsOpt)
def replicationFactor: Option[Integer] = valueAsOption(replicationFactorOpt)
def replicaAssignment: Option[Map[Int, List[Int]]] =
if (has(replicaAssignmentOpt) && !Option(options.valueOf(replicaAssignmentOpt)).getOrElse("").isEmpty)
Some(parseReplicaAssignment(options.valueOf(replicaAssignmentOpt)))
else
None
def rackAwareMode: RackAwareMode = if (has(disableRackAware)) RackAwareMode.Disabled else RackAwareMode.Enforced
def reportUnderReplicatedPartitions: Boolean = has(reportUnderReplicatedPartitionsOpt)
def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
def ifExists: Boolean = has(ifExistsOpt)
def ifNotExists: Boolean = has(ifNotExistsOpt)
def excludeInternalTopics: Boolean = has(excludeInternalTopicOpt)
def topicConfig: Option[util.List[String]] = valuesAsOption(configOpt)
def configsToDelete: Option[util.List[String]] = valuesAsOption(deleteConfigOpt)
def checkArgs(): Unit = {
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, or change a topic.")
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to create, delete, describe, or change a topic.")
// should have exactly one action
val actions = Seq(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).count(options.has)
if (actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
// check required args
if (has(bootstrapServerOpt) == has(zkConnectOpt))
throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
if (!has(bootstrapServerOpt))
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
if(has(describeOpt) && has(ifExistsOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(createOpt) && !has(replicaAssignmentOpt) && has(zkConnectOpt))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt, replicationFactorOpt)
if (has(bootstrapServerOpt) && has(alterOpt)) {
CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt),
Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
}