in backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala [112:230]
def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
numInputRows: SQLMetric,
numOutputBatches: SQLMetric,
convertTime: SQLMetric,
columnBatchSize: Int): Iterator[ColumnarBatch] = {
if (it.isEmpty) {
return Iterator.empty
}
val arrowSchema =
SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName, "RowToColumnar")
val jniWrapper = NativeRowToColumnarJniWrapper.create(runtime)
val arrowAllocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(arrowAllocator)
val factory = UnsafeProjection
val converter = factory.create(schema)
val r2cHandle =
try {
ArrowAbiUtil.exportSchema(arrowAllocator, arrowSchema, cSchema)
jniWrapper.init(cSchema.memoryAddress())
} finally {
cSchema.close()
}
val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
var finished = false
override def hasNext: Boolean = {
if (finished) {
false
} else {
it.hasNext
}
}
def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
row match {
case unsafeRow: UnsafeRow => unsafeRow
case _ =>
converter.apply(row)
}
}
override def next(): ColumnarBatch = {
var arrowBuf: ArrowBuf = null
TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
if (arrowBuf != null && arrowBuf.refCnt() != 0) {
arrowBuf.close()
}
}
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0L
while (rowCount < columnBatchSize && !finished) {
if (!it.hasNext) {
finished = true
} else {
val row = it.next()
val start = System.currentTimeMillis()
val unsafeRow = convertToUnsafeRow(row)
val sizeInBytes = unsafeRow.getSizeInBytes
// allocate buffer based on first row
if (rowCount == 0) {
// allocate buffer based on 1st row, but if first row is very big, this will cause OOM
// maybe we should optimize to list ArrayBuf to native to avoid buf close and allocate
// 31760L origins from BaseVariableWidthVector.lastValueAllocationSizeInBytes
// experimental value
val estimatedBufSize = Math.max(
Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L * columnBatchSize),
sizeInBytes.toDouble * 10)
arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
}
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
tmpBuf.setBytes(0, arrowBuf, 0, offset)
arrowBuf.close()
arrowBuf = tmpBuf
}
Platform.copyMemory(
unsafeRow.getBaseObject,
unsafeRow.getBaseOffset,
null,
arrowBuf.memoryAddress() + offset,
sizeInBytes)
offset += sizeInBytes
rowLength += sizeInBytes.toLong
rowCount += 1
convertTime += System.currentTimeMillis() - start
}
}
numInputRows += rowCount
numOutputBatches += 1
val startNative = System.currentTimeMillis()
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress())
val cb = ColumnarBatches.create(handle)
convertTime += System.currentTimeMillis() - startNative
cb
} finally {
arrowBuf.close()
arrowBuf = null
}
}
}
Iterators
.wrap(res)
.protectInvocationFlow()
.recycleIterator {
jniWrapper.close(r2cHandle)
}
.recyclePayload(_.close())
.create()
}