in loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala [35:165]
override def run() = {
validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath")
val kafkaZkQuorum = args(0)
val brokerList = args(1)
val topics = args(2)
val intervalInSec = seconds(args(3).toLong)
val dbUrl = args(4)
val outputPath = args(5)
val hiveDatabase = args(6)
val hiveTable = args(7)
val splitListPath = args(8)
val conf = sparkConf(s"$topics: WalLogToHDFS")
val ssc = streamingContext(conf, intervalInSec)
val sc = ssc.sparkContext
val groupId = topics.replaceAll(",", "_") + "_stream"
val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
val kafkaParams = Map(
"zookeeper.connect" -> kafkaZkQuorum,
"group.id" -> groupId,
"metadata.broker.list" -> brokerList,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "largest")
val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _))
val hdfsBlockSize = 134217728 // 128M
val hiveContext = new HiveContext(sc)
var splits = Array[String]()
var excludeLabels = Set[String]()
var excludeServices = Set[String]()
stream.foreachRDD { (rdd, time) =>
try {
val read = sc.textFile(splitListPath).collect().map(_.split("=")).flatMap {
case Array(value) => Some(("split", value))
case Array(key, value) => Some((key, value))
case _ => None
}
splits = read.filter(_._1 == "split").map(_._2)
excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet
excludeServices = read.filter(_._1 == "exclude_service").map(_._2).toSet
} catch {
case _: Throwable => // use previous information
}
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val elements = rdd.mapPartitions { partition =>
// set executor setting.
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
partition.flatMap { case (key, msg) =>
val optMsg = GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg).flatMap { element =>
val arr = msg.split("\t", 7)
val service = element.serviceName
val label = arr(5)
val n = arr.length
if (excludeServices.contains(service) || excludeLabels.contains(label)) {
None
} else if(n == 6) {
Some(Seq(msg, "{}", service).mkString("\t"))
}
else if(n == 7) {
Some(Seq(msg, service).mkString("\t"))
}
else {
None
}
}
optMsg
}
}
val ts = time.milliseconds
val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
/* make sure that `elements` are not running at the same time */
val elementsWritten = {
elements.cache()
(Array("all") ++ splits).foreach {
case split if split == "all" =>
val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
elements.saveAsTextFile(path)
case split =>
val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
val strlen = split.length
val splitData = elements.filter(_.takeRight(strlen) == split).cache()
val totalSize = splitData
.mapPartitions { iterator =>
val s = iterator.map(_.length.toLong).sum
Iterator.single(s)
}
.sum
.toLong
val numPartitions = math.max(1, (totalSize / hdfsBlockSize.toDouble).toInt)
splitData.coalesce(math.min(splitData.partitions.length, numPartitions)).saveAsTextFile(path)
splitData.unpersist()
}
elements.unpersist()
elements
}
elementsWritten.mapPartitionsWithIndex { (i, part) =>
// commit offset range
val osr = offsets(i)
getStreamHelper(kafkaParams).commitConsumerOffset(osr)
Iterator.empty
}.foreach {
(_: Nothing) => ()
}
(Array("all") ++ splits).foreach { split =>
val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
hiveContext.sql(s"use $hiveDatabase")
hiveContext.sql(s"alter table $hiveTable add partition (split='$split', date_id='$dateId', ts='$ts') location '$path'")
}
}
logInfo(s"counter: ${mapAcc.value}")
println(s"counter: ${mapAcc.value}")
ssc.start()
ssc.awaitTermination()
}