in measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala [121:157]
override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
info(s"Selecting random ${dataSetSample * 100}% of the rows for profiling.")
val input = dataSource.sample(dataSetSample)
val profilingColNames = keyCols(input)
val profilingCols = input.schema.fields.filter(f => profilingColNames.contains(f.name))
val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
val colName = field.name
val profilingExprs =
getProfilingExprs(field, roundScale, approxDistinctCount, dataSetSample).map(nullToZero)
exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
})
val aggregateDf = profilingCols
.foldLeft(input)((df, field) => {
val colName = field.name
val column = col(colName)
val lengthColName = lengthColFn(colName)
val nullColName = nullsInColFn(colName)
df.withColumn(lengthColName, length(column))
.withColumn(nullColName, when(isnull(column), 1L).otherwise(0L))
})
.agg(count(lit(1L)).as(Total), profilingExprs: _*)
val detailCols =
aggregateDf.columns
.filter(_.startsWith(DetailsPrefix))
.flatMap(c => Seq(map(lit(c.stripPrefix(DetailsPrefix)), col(c))))
val metricDf = aggregateDf.select(array(detailCols: _*).as(valueColumn))
(sparkSession.emptyDataFrame, metricDf)
}