in measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala [99:135]
override def impl(input: DataFrame): (DataFrame, DataFrame) = {
val cols = keyCols(input).map(col)
val isNullCol = safeReduce(cols.map(x => x.isNull))(_ and _)
val uniqueCol = condition(col(Count) === 1)
val nonUniqueCol = condition(col(Count) > 1 and col(RowNumber) === 1)
val duplicateCol = condition(col(Count) > 1 and col(RowNumber) > 1)
val distinctCol = condition(col(Unique) === 1 or col(NonUnique) === 1)
val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val aggDf = input
.withColumn(IsNotNull, not(isNullCol))
.withColumn(RowNumber, row_number().over(window))
.withColumn(Count, count(lit(1)).over(window))
.withColumn(Unique, uniqueCol)
.withColumn(NonUnique, nonUniqueCol)
.withColumn(Duplicate, duplicateCol)
.withColumn(Distinct, distinctCol)
.withColumn(Total, lit(1))
.withColumn(valueColumn, col(badnessExpr))
.drop(IsNotNull, RowNumber, Count)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
val selectCols = duplicationMeasures.map(e =>
map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val metricDf = aggDf
.agg(metricAggCols.head, metricAggCols.tail: _*)
.select(metricColumn)
val badRecordsDf = aggDf.drop(duplicationMeasures: _*)
(badRecordsDf, metricDf)
}