in measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala [115:164]
private def getStreamingRecords(
context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
implicit val encoder: Encoder[(Long, String)] =
Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
val defTimestamp = context.contextId.timestamp
getDataFrame(context, inputName) match {
case Some(df) =>
val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
case Some(filterDf) =>
// timestamps with empty flag
val tmsts: Array[(Long, Boolean)] = filterDf.collect.flatMap { row =>
try {
val tmst = getTmst(row, defTimestamp)
val empty = row.getAs[Boolean](ConstantColumns.empty)
Some((tmst, empty))
} catch {
case _: Throwable => None
}
}
val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
val filterFuncOpt: Option[Long => Boolean] = if (recordTmsts.nonEmpty) {
Some((t: Long) => recordTmsts.contains(t))
} else None
(filterFuncOpt, emptyTmsts)
case _ => (Some((_: Long) => true), Set[Long]())
}
// filter timestamps need to record
filterFuncOpt match {
case Some(filterFunc) =>
val records = df.flatMap { row =>
val tmst = getTmst(row, defTimestamp)
if (filterFunc(tmst)) {
try {
val map = SparkRowFormatter.formatRow(row)
val str = JsonUtil.toJson(map)
Some((tmst, str))
} catch {
case _: Throwable => None
}
} else None
}
(Some(records.rdd.groupByKey), emptyTimestamps)
case _ => (None, emptyTimestamps)
}
case _ => (None, Set[Long]())
}
}