in core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala [67:196]
def main(args: Array[String]): Unit = {
val parser = new OptionParser(false)
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
.withRequiredArg
.describedAs("bytes")
.ofType(classOf[java.lang.Integer])
.defaultsTo(ConsumerConfig.FetchSize)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")
.withRequiredArg
.describedAs("Java regex (String)")
.ofType(classOf[String])
.defaultsTo(".*")
val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(30 * 1000L)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
val options = parser.parse(args : _*)
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
val regex = options.valueOf(topicWhiteListOpt)
val topicWhiteListFiler = new Whitelist(regex)
try {
Pattern.compile(regex)
}
catch {
case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
val fetchSize = options.valueOf(fetchSizeOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
val reportInterval = options.valueOf(reportIntervalOpt).longValue
// getting topic metadata
info("Getting topic metadata...")
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
true
else
false
)
if (filteredTopicMetadata.isEmpty) {
error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
Exit.exit(1)
}
val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
topicMetadataResponse =>
topicMetadataResponse.partitionsMetadata.flatMap(
partitionMetadata =>
partitionMetadata.replicas.map(broker =>
TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id))
)
)
debug("Selected topic partitions: " + topicPartitionReplicaList)
val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId)
.map { case (brokerId, partitions) =>
brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } }
debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId))
.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
(TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
}
}.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
topicAndPartition
})
debug("Leaders per broker: " + leadersPerBroker)
val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
leadersPerBroker,
topicAndPartitionsPerBroker.size,
brokerMap,
initialOffsetTime,
reportInterval)
// create all replica fetcher threads
val verificationBrokerId = topicAndPartitionsPerBroker.head._1
val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map {
case (brokerId, topicAndPartitions) =>
new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,
sourceBroker = brokerMap(brokerId),
topicAndPartitions = topicAndPartitions,
replicaBuffer = replicaBuffer,
socketTimeout = 30000,
socketBufferSize = 256000,
fetchSize = fetchSize,
maxWait = maxWaitMs,
minBytes = 1,
doVerification = brokerId == verificationBrokerId)
}
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
info("Stopping all fetchers")
fetcherThreads.foreach(_.shutdown())
}
})
fetcherThreads.foreach(_.start())
println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")
}