private def genOutputByMerging()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala [469:671]


  private def genOutputByMerging(
      windowSize: Long,
      slideSize: Long,
      bufferLimitSize: Int,
      outputType: RowType,
      aggregateMapTerm: String,
      argsMapping: Array[Array[(Int, LogicalType)]],
      aggBuffMapping: Array[Array[(Int, LogicalType)]],
      aggKeyTypeTerm: String,
      aggBufferTypeTerm: String,
      aggMapKeyType: RowType,
      aggBufferType: RowType): String = {
    val keyComputerTerm = newName(ctx, "keyComputer")
    val recordComparatorTerm = newName(ctx, "recordComparator")
    val prepareSorterCode = HashAggCodeGenHelper.genKVSorterPrepareCode(
      ctx,
      keyComputerTerm,
      recordComparatorTerm,
      aggMapKeyType)

    val memPoolTypeTerm = classOf[BytesHashMapSpillMemorySegmentPool].getName
    val binaryRowSerializerTypeTerm = classOf[BinaryRowDataSerializer].getName

    val sorterBufferType = classOf[BinaryKVInMemorySortBuffer].getName
    val sorterBufferTerm = newName(ctx, "buffer")

    val createSorterBufferCode =
      s"""
         |   $prepareSorterCode
         |   $sorterBufferType $sorterBufferTerm = $sorterBufferType.createBuffer(
         |      $keyComputerTerm,
         |      new $binaryRowSerializerTypeTerm($aggKeyTypeTerm.length),
         |      new $binaryRowSerializerTypeTerm($aggBufferTypeTerm.length),
         |      $recordComparatorTerm,
         |      $aggregateMapTerm.getRecordAreaMemorySegments(),
         |      $aggregateMapTerm.getNumElements(),
         |      new $memPoolTypeTerm($aggregateMapTerm.getBucketAreaMemorySegments())
         |   );
       """.stripMargin

    val reuseAggMapKeyTerm = newName(ctx, "reusedKey")
    val reuseAggBufferTerm = newName(ctx, "reusedValue")
    val reuseKVTerm = newName(ctx, "reusedKV")
    val binaryRow = classOf[BinaryRowData].getName
    val kvType = classOf[JTuple2[_, _]].getName
    ctx.addReusableMember(
      s"transient $binaryRow $reuseAggMapKeyTerm = new $binaryRow(${aggMapKeyType.getFieldCount});")
    ctx.addReusableMember(
      s"transient $binaryRow $reuseAggBufferTerm = new $binaryRow(${aggBufferType.getFieldCount});")
    ctx.addReusableMember(
      s"transient $kvType<$binaryRow, $binaryRow> $reuseKVTerm = " +
        s"new  $kvType<$binaryRow, $binaryRow>($reuseAggMapKeyTerm, $reuseAggBufferTerm);"
    )

    // ---------------------------------------------------------------------------------------------
    // gen code to create a buffer to group all the elements having the same grouping key
    val windowElementType = getWindowsGroupingElementInfo()
    // project into aggregate map key and value into prepared window element
    val bufferWindowElementTerm = newName(ctx, "prepareWinElement")
    val bufferWindowElementWriterTerm = newName(ctx, "prepareWinElementWriter")
    val exprCodegen = new ExprCodeGenerator(ctx, false)
    // TODO refine this. Is it possible to reuse grouping key projection?
    val accessKeyExprs =
      for (idx <- 0 until aggMapKeyType.getFieldCount - 1)
        yield GenerateUtils.generateFieldAccess(ctx, aggMapKeyType, reuseAggMapKeyTerm, idx)
    val accessTimestampExpr = GenerateUtils.generateFieldAccess(
      ctx,
      aggMapKeyType,
      reuseAggMapKeyTerm,
      aggMapKeyType.getFieldCount - 1)
    val accessValExprs =
      for (idx <- 0 until aggBufferType.getFieldCount)
        yield GenerateUtils.generateFieldAccess(ctx, aggBufferType, reuseAggBufferTerm, idx)
    val accessExprs = (accessKeyExprs :+ GeneratedExpression(
      accessTimestampExpr.resultTerm,
      "false",
      accessTimestampExpr.code,
      timestampInternalType)) ++ accessValExprs
    val buildWindowsGroupingElementExpr = exprCodegen.generateResultExpression(
      accessExprs,
      windowElementType,
      classOf[BinaryRowData],
      outRow = bufferWindowElementTerm,
      outRowWriter = Some(bufferWindowElementWriterTerm))

    // ---------------------------------------------------------------------------------------------
    // gen code to apply aggregate functions to grouping window elements
    val timeWindowType = classOf[TimeWindow].getName
    val currentWindow = newName(ctx, "currentWindow")
    ctx.addReusableMember(s"transient $timeWindowType $currentWindow = null;")

    // gen code to assign window and aggregate
    val windowsGrouping = newName(ctx, "windowsGrouping")
    val (processCode, endCode) = if (grouping.isEmpty) {
      val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
        enablePreAcc = true,
        ctx,
        windowSize,
        slideSize,
        windowsGrouping,
        bufferLimitSize,
        windowElementType,
        inputTimeFieldIndex,
        currentWindow,
        None,
        outputType)
      val process =
        s"""
           |// prepare windows grouping input
           |${buildWindowsGroupingElementExpr.code}
           |$windowsGrouping
           |  .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
           |$triggerWindowAgg
         """.stripMargin
      (process, endWindowAgg)
    } else {
      // project grouping keys from aggregate map's key
      val groupKeyTerm = newName(ctx, "groupKey")
      val groupKeyWriterTerm = newName(ctx, "groupKeyWriter")
      val projGroupingKeyCode = ProjectionCodeGenerator
        .generateProjectionExpression(
          ctx,
          aggMapKeyType,
          groupKeyRowType,
          grouping.indices.toArray,
          inputTerm = reuseAggMapKeyTerm,
          outRecordTerm = groupKeyTerm,
          outRecordWriterTerm = groupKeyWriterTerm
        )
        .code

      (
        "GroupingKeyFromAggMapKey",
        ctx,
        groupKeyRowType,
        grouping.indices.toArray,
        aggMapKeyType,
        reuseAggMapKeyTerm,
        groupKeyTerm,
        groupKeyWriterTerm)

      // gen code to check group key changed
      val lastKeyTerm = newName(ctx, "lastKey")
      ctx.addReusableMember(s"transient $BINARY_ROW $lastKeyTerm = null;")
      val keyNotEqualsCode = genGroupKeyChangedCheckCode(groupKeyTerm, lastKeyTerm)

      val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
        enablePreAcc = true,
        ctx,
        windowSize,
        slideSize,
        windowsGrouping,
        bufferLimitSize,
        windowElementType,
        inputTimeFieldIndex,
        currentWindow,
        Some(lastKeyTerm),
        outputType
      )

      val process =
        s"""
           |// project agg grouping key
           |$projGroupingKeyCode
           |// prepare windows grouping input
           |${buildWindowsGroupingElementExpr.code}
           |if ($lastKeyTerm == null) {
           |  $lastKeyTerm = $groupKeyTerm.copy();
           |} else if ($keyNotEqualsCode) {
           |  $endWindowAgg
           |  $lastKeyTerm = $groupKeyTerm.copy();
           |}
           |$windowsGrouping
           |  .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
           |$triggerWindowAgg
         """.stripMargin
      val end =
        s"""
           | $endWindowAgg
           | $lastKeyTerm = null;
       """.stripMargin
      (process, end)
    }

    val sortType = classOf[QuickSort].getName
    val bufferIteratorType = classOf[MutableObjectIterator[_]].getName
    s"""
       | if (hasInput) {
       |  // sort by grouping keys and assigned timestamp
       |  $createSorterBufferCode
       |  new $sortType().sort($sorterBufferTerm);
       |  // merge and get result
       |  $bufferIteratorType<$kvType<$binaryRow, $binaryRow>> iterator =
       |    $sorterBufferTerm.getIterator();
       |  while (iterator.next($reuseKVTerm) != null) {
       |      // reusable input fields access
       |      ${ctx.reuseInputUnboxingCode(bufferWindowElementTerm)}
       |      $processCode
       |   }
       |  $endCode
       | }
       """.stripMargin
  }