def toColumnarBatchIterator()

in backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala [112:230]


  def toColumnarBatchIterator(
      it: Iterator[InternalRow],
      schema: StructType,
      numInputRows: SQLMetric,
      numOutputBatches: SQLMetric,
      convertTime: SQLMetric,
      columnBatchSize: Int): Iterator[ColumnarBatch] = {
    if (it.isEmpty) {
      return Iterator.empty
    }

    val arrowSchema =
      SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
    val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName, "RowToColumnar")
    val jniWrapper = NativeRowToColumnarJniWrapper.create(runtime)
    val arrowAllocator = ArrowBufferAllocators.contextInstance()
    val cSchema = ArrowSchema.allocateNew(arrowAllocator)
    val factory = UnsafeProjection
    val converter = factory.create(schema)
    val r2cHandle =
      try {
        ArrowAbiUtil.exportSchema(arrowAllocator, arrowSchema, cSchema)
        jniWrapper.init(cSchema.memoryAddress())
      } finally {
        cSchema.close()
      }

    val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
      var finished = false

      override def hasNext: Boolean = {
        if (finished) {
          false
        } else {
          it.hasNext
        }
      }

      def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
        row match {
          case unsafeRow: UnsafeRow => unsafeRow
          case _ =>
            converter.apply(row)
        }
      }

      override def next(): ColumnarBatch = {
        var arrowBuf: ArrowBuf = null
        TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
          if (arrowBuf != null && arrowBuf.refCnt() != 0) {
            arrowBuf.close()
          }
        }
        val rowLength = new ListBuffer[Long]()
        var rowCount = 0
        var offset = 0L
        while (rowCount < columnBatchSize && !finished) {
          if (!it.hasNext) {
            finished = true
          } else {
            val row = it.next()
            val start = System.currentTimeMillis()
            val unsafeRow = convertToUnsafeRow(row)
            val sizeInBytes = unsafeRow.getSizeInBytes

            // allocate buffer based on first row
            if (rowCount == 0) {
              // allocate buffer based on 1st row, but if first row is very big, this will cause OOM
              // maybe we should optimize to list ArrayBuf to native to avoid buf close and allocate
              // 31760L origins from BaseVariableWidthVector.lastValueAllocationSizeInBytes
              // experimental value
              val estimatedBufSize = Math.max(
                Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L * columnBatchSize),
                sizeInBytes.toDouble * 10)
              arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
            }

            if ((offset + sizeInBytes) > arrowBuf.capacity()) {
              val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
              tmpBuf.setBytes(0, arrowBuf, 0, offset)
              arrowBuf.close()
              arrowBuf = tmpBuf
            }
            Platform.copyMemory(
              unsafeRow.getBaseObject,
              unsafeRow.getBaseOffset,
              null,
              arrowBuf.memoryAddress() + offset,
              sizeInBytes)
            offset += sizeInBytes
            rowLength += sizeInBytes.toLong
            rowCount += 1
            convertTime += System.currentTimeMillis() - start
          }
        }
        numInputRows += rowCount
        numOutputBatches += 1
        val startNative = System.currentTimeMillis()
        try {
          val handle = jniWrapper
            .nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress())
          val cb = ColumnarBatches.create(handle)
          convertTime += System.currentTimeMillis() - startNative
          cb
        } finally {
          arrowBuf.close()
          arrowBuf = null
        }
      }
    }
    Iterators
      .wrap(res)
      .protectInvocationFlow()
      .recycleIterator {
        jniWrapper.close(r2cHandle)
      }
      .recyclePayload(_.close())
      .create()
  }