override def impl()

in measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala [99:135]


  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
    val cols = keyCols(input).map(col)

    val isNullCol = safeReduce(cols.map(x => x.isNull))(_ and _)
    val uniqueCol = condition(col(Count) === 1)
    val nonUniqueCol = condition(col(Count) > 1 and col(RowNumber) === 1)
    val duplicateCol = condition(col(Count) > 1 and col(RowNumber) > 1)
    val distinctCol = condition(col(Unique) === 1 or col(NonUnique) === 1)

    val window = Window.partitionBy(cols: _*).orderBy(cols: _*)

    val aggDf = input
      .withColumn(IsNotNull, not(isNullCol))
      .withColumn(RowNumber, row_number().over(window))
      .withColumn(Count, count(lit(1)).over(window))
      .withColumn(Unique, uniqueCol)
      .withColumn(NonUnique, nonUniqueCol)
      .withColumn(Duplicate, duplicateCol)
      .withColumn(Distinct, distinctCol)
      .withColumn(Total, lit(1))
      .withColumn(valueColumn, col(badnessExpr))
      .drop(IsNotNull, RowNumber, Count)

    val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))

    val selectCols = duplicationMeasures.map(e =>
      map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
    val metricColumn: Column = array(selectCols: _*).as(valueColumn)

    val metricDf = aggDf
      .agg(metricAggCols.head, metricAggCols.tail: _*)
      .select(metricColumn)

    val badRecordsDf = aggDf.drop(duplicationMeasures: _*)

    (badRecordsDf, metricDf)
  }