in measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala [65:132]
def accuracy(
sparkSession: SparkSession,
inputDfName: String,
contextId: ContextId,
details: Map[String, Any]): DataFrame = {
import AccuracyOprKeys._
val miss = details.getStringOrKey(_miss)
val total = details.getStringOrKey(_total)
val matched = details.getStringOrKey(_matched)
val matchedFraction = details.getStringOrKey(_matchedFraction)
val updateTime = new Date().getTime
def getLong(r: Row, k: String): Option[Long] = {
try {
Some(r.getAs[Long](k))
} catch {
case _: Throwable => None
}
}
val df = sparkSession.table(s"`$inputDfName`")
val results = df.rdd.flatMap { row =>
try {
val tmst = getLong(row, ConstantColumns.tmst).getOrElse(contextId.timestamp)
val missCount = getLong(row, miss).getOrElse(0L)
val totalCount = getLong(row, total).getOrElse(0L)
val ar = AccuracyMetric(missCount, totalCount)
if (ar.isLegal) Some((tmst, ar)) else None
} catch {
case _: Throwable => None
}
}.collect
// cache and update results
val updatedResults = CacheResults.update(results.map { pair =>
val (t, r) = pair
CacheResult(t, updateTime, r)
})
// generate metrics
val schema = StructType(
Array(
StructField(ConstantColumns.tmst, LongType),
StructField(miss, LongType),
StructField(total, LongType),
StructField(matched, LongType),
StructField(matchedFraction, DoubleType),
StructField(ConstantColumns.record, BooleanType),
StructField(ConstantColumns.empty, BooleanType)))
val rows = updatedResults.map { r =>
val ar = r.result.asInstanceOf[AccuracyMetric]
Row(
r.timeStamp,
ar.miss,
ar.total,
ar.getMatch,
ar.matchFraction,
!ar.initial,
ar.eventual())
}.toArray
val rowRdd = sparkSession.sparkContext.parallelize(rows)
val retDf = sparkSession.createDataFrame(rowRdd, schema)
retDf
}