in core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala [78:210]
def main(args: Array[String]): Unit = {
val parser = new OptionParser(false)
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
.withRequiredArg
.describedAs("bytes")
.ofType(classOf[java.lang.Integer])
.defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")
.withRequiredArg
.describedAs("Java regex (String)")
.ofType(classOf[String])
.defaultsTo(".*")
val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(30 * 1000L)
val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
val versionOpt = parser.accepts("version", "Print version information and exit.").forHelp()
val options = parser.parse(args: _*)
if (args.length == 0 || options.has(helpOpt)) {
CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
}
if (options.has(versionOpt)) {
CommandLineUtils.printVersionAndDie()
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
val regex = options.valueOf(topicWhiteListOpt)
val topicWhiteListFiler = new Whitelist(regex)
try Pattern.compile(regex)
catch {
case _: PatternSyntaxException =>
throw new RuntimeException(s"$regex is an invalid regex.")
}
val fetchSize = options.valueOf(fetchSizeOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
val reportInterval = options.valueOf(reportIntervalOpt).longValue
// getting topic metadata
info("Getting topic metadata...")
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerList)
val (topicsMetadata, brokerInfo) = {
val adminClient = createAdminClient(brokerList)
try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
finally CoreUtils.swallow(adminClient.close(), this)
}
val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
topicWhiteListFiler.isTopicAllowed(topicMetaData.name, excludeInternalTopics = false)
}
if (filteredTopicMetadata.isEmpty) {
error(s"No topics found. $topicWhiteListOpt if specified, is either filtering out all topics or there is no topic.")
Exit.exit(1)
}
val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
partitionMetadata.replicas.asScala.map { node =>
TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
}
}
}
debug(s"Selected topic partitions: $topicPartitionReplicas")
val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
}
debug(s"Topic partitions per broker: $brokerToTopicPartitions")
val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
new TopicPartition(replica.topic, replica.partitionId)
}.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")
val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
topicMetaData.partitions.asScala.map { partitionMetadata =>
new TopicPartition(topicMetaData.name, partitionMetadata.partition)
}
}
val consumerProps = consumerConfig(brokerList)
val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
brokerToTopicPartitions.size,
reportInterval)
// create all replica fetcher threads
val verificationBrokerId = brokerToTopicPartitions.head._1
val counter = new AtomicInteger(0)
val fetcherThreads = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
sourceBroker = brokerInfo(brokerId),
topicPartitions = topicPartitions,
replicaBuffer = replicaBuffer,
socketTimeout = 30000,
socketBufferSize = 256000,
fetchSize = fetchSize,
maxWait = maxWaitMs,
minBytes = 1,
doVerification = brokerId == verificationBrokerId,
consumerProps,
fetcherId = counter.incrementAndGet())
}
Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
info("Stopping all fetchers")
fetcherThreads.foreach(_.shutdown())
})
fetcherThreads.foreach(_.start())
println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")
}