in measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala [113:145]
override def sinkRecords(records: RDD[String], name: String): Unit = {
val path = filePath(name)
clearOldRecords(path)
try {
val recordCount = records.count
val count =
if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
if (count > 0) {
val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
if (groupCount <= 1) {
val recs = records.take(count.toInt)
sinkRecords2Hdfs(path, recs)
} else {
val groupedRecords: RDD[(Long, Iterable[String])] =
records.zipWithIndex
.flatMap { r =>
val gid = r._2 / maxLinesPerFile
if (gid < groupCount) Some((gid, r._1)) else None
}
.groupByKey()
groupedRecords.foreach { group =>
val (gid, recs) = group
val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
sinkRecords2Hdfs(hdfsPath, recs)
}
}
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
}