in measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala [102:152]
override def impl(input: DataFrame): (DataFrame, DataFrame) = {
val originalCols = input.columns
val dataSource = addColumnPrefix(input, SourcePrefixStr)
val refDataSource =
addColumnPrefix(sparkSession.read.table(refSource), refPrefixStr)
val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
val accuracyExprs = expr
.map(toAccuracyExpr)
.distinct
.map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
val joinExpr = safeReduce(
accuracyExprs
.map(e => col(e.sourceCol) === col(e.refCol)))(_ and _)
val indicatorExpr =
safeReduce(
accuracyExprs
.map(e =>
coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol)))(
_ or _)
val nullExpr = safeReduce(accuracyExprs.map(e => col(e.sourceCol).isNull))(_ or _)
val cols = accuracyExprs.map(_.refCol).map(col)
val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val recordsDf = removeColumnPrefix(
dataSource
.join(refDataSource.withColumn(RowNumber, row_number().over(window)), joinExpr, "left")
.where(col(RowNumber) === 1 or col(RowNumber).isNull)
.withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
SourcePrefixStr)
.select((originalCols :+ valueColumn).map(col): _*)
val selectCols =
Seq(Total, AccurateStr, InAccurateStr).map(e =>
map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val metricDf = recordsDf
.withColumn(Total, lit(1))
.agg(sum(Total).as(Total), sum(valueColumn).as(InAccurateStr))
.withColumn(AccurateStr, col(Total) - col(InAccurateStr))
.select(metricColumn)
(recordsDf, metricDf)
}