override def impl()

in measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala [121:157]


  override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
    info(s"Selecting random ${dataSetSample * 100}% of the rows for profiling.")
    val input = dataSource.sample(dataSetSample)
    val profilingColNames = keyCols(input)

    val profilingCols = input.schema.fields.filter(f => profilingColNames.contains(f.name))

    val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
      val colName = field.name
      val profilingExprs =
        getProfilingExprs(field, roundScale, approxDistinctCount, dataSetSample).map(nullToZero)

      exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
    })

    val aggregateDf = profilingCols
      .foldLeft(input)((df, field) => {
        val colName = field.name
        val column = col(colName)

        val lengthColName = lengthColFn(colName)
        val nullColName = nullsInColFn(colName)

        df.withColumn(lengthColName, length(column))
          .withColumn(nullColName, when(isnull(column), 1L).otherwise(0L))
      })
      .agg(count(lit(1L)).as(Total), profilingExprs: _*)

    val detailCols =
      aggregateDf.columns
        .filter(_.startsWith(DetailsPrefix))
        .flatMap(c => Seq(map(lit(c.stripPrefix(DetailsPrefix)), col(c))))

    val metricDf = aggregateDf.select(array(detailCols: _*).as(valueColumn))

    (sparkSession.emptyDataFrame, metricDf)
  }