def dedupWithCombiner()

in datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala [210:250]


  def dedupWithCombiner(df: DataFrame,
                        groupCol: Seq[Column],
                        orderByCol: Seq[Column],
                        desc: Boolean = true,
                        moreAggFunctions: Seq[Column] = Nil,
                        columnsFilter: Seq[String] = Nil,
                        columnsFilterKeep: Boolean = true): DataFrame = {
    val newDF =
      if (columnsFilter == Nil) {
        df.withColumn("sort_by_column", struct(orderByCol: _*))
      } else {
        if (columnsFilterKeep) {
          df.withColumn("sort_by_column", struct(orderByCol: _*))
            .select("sort_by_column", columnsFilter: _*)
        } else {
          df.select(
            df.columns
              .filter(colName => !columnsFilter.contains(colName))
              .map(colName => new Column(colName)): _*)
            .withColumn("sort_by_column", struct(orderByCol: _*))
        }
      }

    val aggFunc =
      if (desc) SparkOverwriteUDAFs.maxValueByKey(_: Column, _: Column)
      else SparkOverwriteUDAFs.minValueByKey(_: Column, _: Column)

    val df2 = newDF
      .groupBy(groupCol:_*)
      .agg(aggFunc(expr("sort_by_column"), expr("struct(sort_by_column, *)"))
        .as("h1"),
        struct(lit(1).as("lit_placeholder_col") +: moreAggFunctions: _*)
          .as("h2"))
      .selectExpr("h1.*", "h2.*")
      .drop("lit_placeholder_col")
      .drop("sort_by_column")
    val ns = StructType((df.schema++df2.schema.filter(s2 => !df.schema.map(_.name).contains(s2.name)))
      .filter(s2 => columnsFilter == Nil || (columnsFilterKeep && columnsFilter.contains(s2.name)) || (!columnsFilterKeep && !columnsFilter.contains(s2.name))).toList)

    df2.sparkSession.createDataFrame(df2.rdd,ns)
  }