def main()

in core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala [78:210]


  def main(args: Array[String]): Unit = {
    val parser = new OptionParser(false)
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
                         .withRequiredArg
                         .describedAs("hostname:port,...,hostname:port")
                         .ofType(classOf[String])
    val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
                         .withRequiredArg
                         .describedAs("bytes")
                         .ofType(classOf[java.lang.Integer])
                         .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
                         .withRequiredArg
                         .describedAs("ms")
                         .ofType(classOf[java.lang.Integer])
                         .defaultsTo(1000)
    val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")
                         .withRequiredArg
                         .describedAs("Java regex (String)")
                         .ofType(classOf[String])
                         .defaultsTo(".*")
    val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
                           .withRequiredArg
                           .describedAs("timestamp/-1(latest)/-2(earliest)")
                           .ofType(classOf[java.lang.Long])
                           .defaultsTo(-1L)
    val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
                         .withRequiredArg
                         .describedAs("ms")
                         .ofType(classOf[java.lang.Long])
                         .defaultsTo(30 * 1000L)
    val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
    val versionOpt = parser.accepts("version", "Print version information and exit.").forHelp()

    val options = parser.parse(args: _*)

    if (args.length == 0 || options.has(helpOpt)) {
      CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
    }

    if (options.has(versionOpt)) {
      CommandLineUtils.printVersionAndDie()
    }
    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)

    val regex = options.valueOf(topicWhiteListOpt)
    val topicWhiteListFiler = new Whitelist(regex)

    try Pattern.compile(regex)
    catch {
      case _: PatternSyntaxException =>
        throw new RuntimeException(s"$regex is an invalid regex.")
    }

    val fetchSize = options.valueOf(fetchSizeOpt).intValue
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
    val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
    val reportInterval = options.valueOf(reportIntervalOpt).longValue
    // getting topic metadata
    info("Getting topic metadata...")
    val brokerList = options.valueOf(brokerListOpt)
    ToolsUtils.validatePortOrDie(parser, brokerList)

    val (topicsMetadata, brokerInfo) = {
      val adminClient = createAdminClient(brokerList)
      try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
      finally CoreUtils.swallow(adminClient.close(), this)
    }

    val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
      topicWhiteListFiler.isTopicAllowed(topicMetaData.name, excludeInternalTopics = false)
    }

    if (filteredTopicMetadata.isEmpty) {
      error(s"No topics found. $topicWhiteListOpt if specified, is either filtering out all topics or there is no topic.")
      Exit.exit(1)
    }

    val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
      topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
        partitionMetadata.replicas.asScala.map { node =>
          TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
        }
      }
    }
    debug(s"Selected topic partitions: $topicPartitionReplicas")
    val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
      brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
    }
    debug(s"Topic partitions per broker: $brokerToTopicPartitions")
    val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
      new TopicPartition(replica.topic, replica.partitionId)
    }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
    debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")

    val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
      topicMetaData.partitions.asScala.map { partitionMetadata =>
        new TopicPartition(topicMetaData.name, partitionMetadata.partition)
      }
    }

    val consumerProps = consumerConfig(brokerList)

    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
      initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
      brokerToTopicPartitions.size,
      reportInterval)
    // create all replica fetcher threads
    val verificationBrokerId = brokerToTopicPartitions.head._1
    val counter = new AtomicInteger(0)
    val fetcherThreads = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
      new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
        sourceBroker = brokerInfo(brokerId),
        topicPartitions = topicPartitions,
        replicaBuffer = replicaBuffer,
        socketTimeout = 30000,
        socketBufferSize = 256000,
        fetchSize = fetchSize,
        maxWait = maxWaitMs,
        minBytes = 1,
        doVerification = brokerId == verificationBrokerId,
        consumerProps,
        fetcherId = counter.incrementAndGet())
    }

    Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
        info("Stopping all fetchers")
        fetcherThreads.foreach(_.shutdown())
    })
    fetcherThreads.foreach(_.start())
    println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")

  }