def main()

in core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala [67:196]


  def main(args: Array[String]): Unit = {
    val parser = new OptionParser(false)
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
                         .withRequiredArg
                         .describedAs("hostname:port,...,hostname:port")
                         .ofType(classOf[String])
    val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
                         .withRequiredArg
                         .describedAs("bytes")
                         .ofType(classOf[java.lang.Integer])
                         .defaultsTo(ConsumerConfig.FetchSize)
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
                         .withRequiredArg
                         .describedAs("ms")
                         .ofType(classOf[java.lang.Integer])
                         .defaultsTo(1000)
    val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")
                         .withRequiredArg
                         .describedAs("Java regex (String)")
                         .ofType(classOf[String])
                         .defaultsTo(".*")
    val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
                           .withRequiredArg
                           .describedAs("timestamp/-1(latest)/-2(earliest)")
                           .ofType(classOf[java.lang.Long])
                           .defaultsTo(-1L)
    val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
                         .withRequiredArg
                         .describedAs("ms")
                         .ofType(classOf[java.lang.Long])
                         .defaultsTo(30 * 1000L)

   if(args.length == 0)
      CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")

    val options = parser.parse(args : _*)
    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)

    val regex = options.valueOf(topicWhiteListOpt)
    val topicWhiteListFiler = new Whitelist(regex)

    try {
      Pattern.compile(regex)
    }
    catch {
      case _: PatternSyntaxException =>
        throw new RuntimeException(regex + " is an invalid regex.")
    }

    val fetchSize = options.valueOf(fetchSizeOpt).intValue
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
    val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
    val reportInterval = options.valueOf(reportIntervalOpt).longValue
    // getting topic metadata
    info("Getting topic metadata...")
    val brokerList = options.valueOf(brokerListOpt)
    ToolsUtils.validatePortOrDie(parser,brokerList)
    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
    val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
    val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
    val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
        topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
          true
        else
          false
    )

    if (filteredTopicMetadata.isEmpty) {
      error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
      Exit.exit(1)
    }

    val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
      topicMetadataResponse =>
        topicMetadataResponse.partitionsMetadata.flatMap(
          partitionMetadata =>
            partitionMetadata.replicas.map(broker =>
              TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id))
        )
    )
    debug("Selected topic partitions: " + topicPartitionReplicaList)
    val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId)
      .map { case (brokerId, partitions) =>
               brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } }
    debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
    val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
          topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId))
          .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
    debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
      topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
        (TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
      }
    }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
       topicAndPartition
     })
    debug("Leaders per broker: " + leadersPerBroker)

    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
                                          leadersPerBroker,
                                          topicAndPartitionsPerBroker.size,
                                          brokerMap,
                                          initialOffsetTime,
                                          reportInterval)
    // create all replica fetcher threads
    val verificationBrokerId = topicAndPartitionsPerBroker.head._1
    val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map {
      case (brokerId, topicAndPartitions) =>
        new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,
                           sourceBroker = brokerMap(brokerId),
                           topicAndPartitions = topicAndPartitions,
                           replicaBuffer = replicaBuffer,
                           socketTimeout = 30000,
                           socketBufferSize = 256000,
                           fetchSize = fetchSize,
                           maxWait = maxWaitMs,
                           minBytes = 1,
                           doVerification = brokerId == verificationBrokerId)
    }

    Runtime.getRuntime.addShutdownHook(new Thread() {
      override def run() {
        info("Stopping all fetchers")
        fetcherThreads.foreach(_.shutdown())
      }
    })
    fetcherThreads.foreach(_.start())
    println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")

  }