private def getStreamingRecords()

in measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala [115:164]


  private def getStreamingRecords(
      context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
    implicit val encoder: Encoder[(Long, String)] =
      Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
    val defTimestamp = context.contextId.timestamp
    getDataFrame(context, inputName) match {
      case Some(df) =>
        val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
          case Some(filterDf) =>
            // timestamps with empty flag
            val tmsts: Array[(Long, Boolean)] = filterDf.collect.flatMap { row =>
              try {
                val tmst = getTmst(row, defTimestamp)
                val empty = row.getAs[Boolean](ConstantColumns.empty)
                Some((tmst, empty))
              } catch {
                case _: Throwable => None
              }
            }
            val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
            val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
            val filterFuncOpt: Option[Long => Boolean] = if (recordTmsts.nonEmpty) {
              Some((t: Long) => recordTmsts.contains(t))
            } else None

            (filterFuncOpt, emptyTmsts)
          case _ => (Some((_: Long) => true), Set[Long]())
        }

        // filter timestamps need to record
        filterFuncOpt match {
          case Some(filterFunc) =>
            val records = df.flatMap { row =>
              val tmst = getTmst(row, defTimestamp)
              if (filterFunc(tmst)) {
                try {
                  val map = SparkRowFormatter.formatRow(row)
                  val str = JsonUtil.toJson(map)
                  Some((tmst, str))
                } catch {
                  case _: Throwable => None
                }
              } else None
            }
            (Some(records.rdd.groupByKey), emptyTimestamps)
          case _ => (None, emptyTimestamps)
        }
      case _ => (None, Set[Long]())
    }
  }