override def run()

in s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala [65:132]


  override def run(): Unit = {
    validateArgument("interval")
    val (intervalInSec) = seconds(args(0).toLong)

    val conf = sparkConf(s"$strInputTopics: $className")
    val ssc = streamingContext(conf, intervalInSec)
    val sc = ssc.sparkContext

    val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _))

    /*
     * read message from etl topic and join user profile from graph and then produce whole message to counter topic
     */
    val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics)

    // etl logic
    stream.foreachRDD { (rdd, ts) =>
      rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
        assert(initialize)

        // convert to edge format
        val items = {
          for {
            (k, v) <- part
            line <- GraphUtil.parseString(v)
            item <- CounterEtlFunctions.parseEdgeFormat(line)
          } yield {
            acc += (("Edges", 1))
            item
          }
        }

        // join user profile
        val joinItems = items.toList.groupBy { e =>
          (e.service, e.action)
        }.flatMap { case ((service, action), v) =>
          CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v)
        }

        // group by kafka partition key and send to kafka
        val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
        joinItems.foreach { item =>
          if (item.useProfile) {
            acc += (("ETL", 1))
          }
          val k = getPartKey(item.item, 20)
          val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem])
          values += item
          m.update(k, values)
        }
        m.foreach { case (k, v) =>
          v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
            acc += (("Produce", grouped.size))
            producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
          }
        }

        streamHelper.commitConsumerOffset(osr)
      }

      if (ts.milliseconds / 1000 % 60 == 0) {
        log.warn(DimensionProps.getCacheStatsString)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }