override def impl()

in measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala [102:152]


  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
    val originalCols = input.columns

    val dataSource = addColumnPrefix(input, SourcePrefixStr)

    val refDataSource =
      addColumnPrefix(sparkSession.read.table(refSource), refPrefixStr)

    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
    val accuracyExprs = expr
      .map(toAccuracyExpr)
      .distinct
      .map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))

    val joinExpr = safeReduce(
      accuracyExprs
        .map(e => col(e.sourceCol) === col(e.refCol)))(_ and _)

    val indicatorExpr =
      safeReduce(
        accuracyExprs
          .map(e =>
            coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol)))(
        _ or _)

    val nullExpr = safeReduce(accuracyExprs.map(e => col(e.sourceCol).isNull))(_ or _)

    val cols = accuracyExprs.map(_.refCol).map(col)
    val window = Window.partitionBy(cols: _*).orderBy(cols: _*)

    val recordsDf = removeColumnPrefix(
      dataSource
        .join(refDataSource.withColumn(RowNumber, row_number().over(window)), joinExpr, "left")
        .where(col(RowNumber) === 1 or col(RowNumber).isNull)
        .withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
      SourcePrefixStr)
      .select((originalCols :+ valueColumn).map(col): _*)

    val selectCols =
      Seq(Total, AccurateStr, InAccurateStr).map(e =>
        map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
    val metricColumn: Column = array(selectCols: _*).as(valueColumn)

    val metricDf = recordsDf
      .withColumn(Total, lit(1))
      .agg(sum(Total).as(Total), sum(valueColumn).as(InAccurateStr))
      .withColumn(AccurateStr, col(Total) - col(InAccurateStr))
      .select(metricColumn)

    (recordsDf, metricDf)
  }