in datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala [210:250]
def dedupWithCombiner(df: DataFrame,
groupCol: Seq[Column],
orderByCol: Seq[Column],
desc: Boolean = true,
moreAggFunctions: Seq[Column] = Nil,
columnsFilter: Seq[String] = Nil,
columnsFilterKeep: Boolean = true): DataFrame = {
val newDF =
if (columnsFilter == Nil) {
df.withColumn("sort_by_column", struct(orderByCol: _*))
} else {
if (columnsFilterKeep) {
df.withColumn("sort_by_column", struct(orderByCol: _*))
.select("sort_by_column", columnsFilter: _*)
} else {
df.select(
df.columns
.filter(colName => !columnsFilter.contains(colName))
.map(colName => new Column(colName)): _*)
.withColumn("sort_by_column", struct(orderByCol: _*))
}
}
val aggFunc =
if (desc) SparkOverwriteUDAFs.maxValueByKey(_: Column, _: Column)
else SparkOverwriteUDAFs.minValueByKey(_: Column, _: Column)
val df2 = newDF
.groupBy(groupCol:_*)
.agg(aggFunc(expr("sort_by_column"), expr("struct(sort_by_column, *)"))
.as("h1"),
struct(lit(1).as("lit_placeholder_col") +: moreAggFunctions: _*)
.as("h2"))
.selectExpr("h1.*", "h2.*")
.drop("lit_placeholder_col")
.drop("sort_by_column")
val ns = StructType((df.schema++df2.schema.filter(s2 => !df.schema.map(_.name).contains(s2.name)))
.filter(s2 => columnsFilter == Nil || (columnsFilterKeep && columnsFilter.contains(s2.name)) || (!columnsFilterKeep && !columnsFilter.contains(s2.name))).toList)
df2.sparkSession.createDataFrame(df2.rdd,ns)
}