override def run()

in loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala [32:107]


  override def run() = {

    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "statTopic")

    val kafkaZkQuorum = args(0)
    val brokerList = args(1)
    val topics = args(2)
    val intervalInSec = seconds(args(3).toLong)
    val dbUrl = args(4)
    val statTopic = args(5)


    val conf = sparkConf(s"$topics: ${getClass.getSimpleName}")
    val ssc = streamingContext(conf, intervalInSec)
    val sc = ssc.sparkContext

    val groupId = topics.replaceAll(",", "_") + "_stat"

    val kafkaParams = Map(
      "zookeeper.connect" -> kafkaZkQuorum,
      "group.id" -> groupId,
      "metadata.broker.list" -> brokerList,
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.offset.reset" -> "largest")

    val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
    val statProducer = getProducer[String, String](brokerList)

    stream.foreachRDD { (rdd, time) =>

      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      val ts = time.milliseconds

      val elements = rdd.mapPartitions { partition =>
        // set executor setting.
        val phase = System.getProperty("phase")
        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
        partition.map { case (key, msg) =>
          GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg) match {
            case Some(elem) =>
              val serviceName = elem.serviceName
              msg.split("\t", 7) match {
                case Array(_, operation, log_type, _, _, label, _*) =>
                  Seq(serviceName, label, operation, log_type).mkString("\t")
                case _ =>
                  Seq("no_service_name", "no_label", "no_operation", "parsing_error").mkString("\t")
              }
            case None =>
              Seq("no_service_name", "no_label", "no_operation", "no_element_error").mkString("\t")
          }
        }
      }

      val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect()
      val totalCount = countByKey.map(_._2).sum
      val keyedMessage = countByKey.map { case (key, value) =>
        new KeyedMessage[String, String](statTopic, s"$ts\t$key\t$value\t$totalCount")
      }

      statProducer.send(keyedMessage: _*)

      elements.mapPartitionsWithIndex { (i, part) =>
        // commit offset range
        val osr = offsets(i)
        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
        Iterator.empty
      }.foreach {
        (_: Nothing) => ()
      }

    }

    ssc.start()
    ssc.awaitTermination()
  }