override def sinkRecords()

in measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala [113:145]


  override def sinkRecords(records: RDD[String], name: String): Unit = {
    val path = filePath(name)
    clearOldRecords(path)
    try {
      val recordCount = records.count

      val count =
        if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)

      if (count > 0) {
        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
        if (groupCount <= 1) {
          val recs = records.take(count.toInt)
          sinkRecords2Hdfs(path, recs)
        } else {
          val groupedRecords: RDD[(Long, Iterable[String])] =
            records.zipWithIndex
              .flatMap { r =>
                val gid = r._2 / maxLinesPerFile
                if (gid < groupCount) Some((gid, r._1)) else None
              }
              .groupByKey()
          groupedRecords.foreach { group =>
            val (gid, recs) = group
            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
            sinkRecords2Hdfs(hdfsPath, recs)
          }
        }
      }
    } catch {
      case e: Throwable => error(e.getMessage, e)
    }
  }