in s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala [65:132]
override def run(): Unit = {
validateArgument("interval")
val (intervalInSec) = seconds(args(0).toLong)
val conf = sparkConf(s"$strInputTopics: $className")
val ssc = streamingContext(conf, intervalInSec)
val sc = ssc.sparkContext
val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _))
/*
* read message from etl topic and join user profile from graph and then produce whole message to counter topic
*/
val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics)
// etl logic
stream.foreachRDD { (rdd, ts) =>
rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
assert(initialize)
// convert to edge format
val items = {
for {
(k, v) <- part
line <- GraphUtil.parseString(v)
item <- CounterEtlFunctions.parseEdgeFormat(line)
} yield {
acc += (("Edges", 1))
item
}
}
// join user profile
val joinItems = items.toList.groupBy { e =>
(e.service, e.action)
}.flatMap { case ((service, action), v) =>
CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v)
}
// group by kafka partition key and send to kafka
val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
joinItems.foreach { item =>
if (item.useProfile) {
acc += (("ETL", 1))
}
val k = getPartKey(item.item, 20)
val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem])
values += item
m.update(k, values)
}
m.foreach { case (k, v) =>
v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
acc += (("Produce", grouped.size))
producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
}
}
streamHelper.commitConsumerOffset(osr)
}
if (ts.milliseconds / 1000 % 60 == 0) {
log.warn(DimensionProps.getCacheStatsString)
}
}
ssc.start()
ssc.awaitTermination()
}