override def execute()

in src/main/scala/com/aliyun/emr/example/spark/streaming/benchmark/WordCount.scala [6:22]


  override def execute(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
    stream.flatMap(kv => {
      val value:List[(String, (Integer, Long))] = List()
      val eventTime = kv.key()
      for (v <- kv.value().split(" ")) {
        (v, (1, eventTime.toLong)) +: value
      }
      value
    }).reduceByKey((x,y) =>{
      val count = x._1 + y._1
      var eventTime = x._2
      if (x._2 < y._2) {
        eventTime = y._2
      }
      (count, eventTime)
    }).map(x => x._2._2).saveAsTextFiles(config.getProperty("filename.prefix") + config.getProperty("name") + "/result")
  }