def accuracy()

in measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala [65:132]


  def accuracy(
      sparkSession: SparkSession,
      inputDfName: String,
      contextId: ContextId,
      details: Map[String, Any]): DataFrame = {
    import AccuracyOprKeys._

    val miss = details.getStringOrKey(_miss)
    val total = details.getStringOrKey(_total)
    val matched = details.getStringOrKey(_matched)
    val matchedFraction = details.getStringOrKey(_matchedFraction)

    val updateTime = new Date().getTime

    def getLong(r: Row, k: String): Option[Long] = {
      try {
        Some(r.getAs[Long](k))
      } catch {
        case _: Throwable => None
      }
    }

    val df = sparkSession.table(s"`$inputDfName`")

    val results = df.rdd.flatMap { row =>
      try {
        val tmst = getLong(row, ConstantColumns.tmst).getOrElse(contextId.timestamp)
        val missCount = getLong(row, miss).getOrElse(0L)
        val totalCount = getLong(row, total).getOrElse(0L)
        val ar = AccuracyMetric(missCount, totalCount)
        if (ar.isLegal) Some((tmst, ar)) else None
      } catch {
        case _: Throwable => None
      }
    }.collect

    // cache and update results
    val updatedResults = CacheResults.update(results.map { pair =>
      val (t, r) = pair
      CacheResult(t, updateTime, r)
    })

    // generate metrics
    val schema = StructType(
      Array(
        StructField(ConstantColumns.tmst, LongType),
        StructField(miss, LongType),
        StructField(total, LongType),
        StructField(matched, LongType),
        StructField(matchedFraction, DoubleType),
        StructField(ConstantColumns.record, BooleanType),
        StructField(ConstantColumns.empty, BooleanType)))
    val rows = updatedResults.map { r =>
      val ar = r.result.asInstanceOf[AccuracyMetric]
      Row(
        r.timeStamp,
        ar.miss,
        ar.total,
        ar.getMatch,
        ar.matchFraction,
        !ar.initial,
        ar.eventual())
    }.toArray
    val rowRdd = sparkSession.sparkContext.parallelize(rows)
    val retDf = sparkSession.createDataFrame(rowRdd, schema)

    retDf
  }