in backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala [83:177]
private def internalCHWrite(records: Iterator[Product2[K, V]]): Unit = {
val splitterJniWrapper: CHShuffleSplitterJniWrapper = jniWrapper
if (!records.hasNext) {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
return
}
val dataTmp = Utils.tempFileWith(shuffleBlockResolver.getDataFile(dep.shuffleId, mapId))
if (nativeSplitter == 0) {
nativeSplitter = splitterJniWrapper.make(
dep.nativePartitioning,
dep.shuffleId,
mapId,
splitSize,
customizedCompressCodec,
dataTmp.getAbsolutePath,
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
throwIfMemoryExceed,
flushBlockBufferBeforeEvict
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, size: Long): Long = {
if (nativeSplitter == 0) {
throw new IllegalStateException(
"Fatal: spill() called before a shuffle writer " +
"is created. This behavior should be optimized by moving memory " +
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to spill $size bytes of data")
val spilled = splitterJniWrapper.evict(nativeSplitter);
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
override def applicablePhases(): util.Set[Spiller.Phase] = Spillers.PHASE_SET_SPILL_ONLY
}
)
}
while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
firstRecordBatch = false
val col = cb.column(0).asInstanceOf[CHColumnVector]
val block = col.getBlockAddress
splitterJniWrapper
.split(nativeSplitter, block)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
writeMetrics.incRecordsWritten(cb.numRows)
}
}
splitResult = splitterJniWrapper.stop(nativeSplitter)
dep.metrics("splitTime").add(splitResult.getSplitTime)
dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
dep.metrics("serializeTime").add(splitResult.getSerializationTime)
dep.metrics("spillTime").add(splitResult.getTotalSpillTime)
dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime)
dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled)
dep.metrics("dataSize").add(splitResult.getTotalBytesWritten)
writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime)
partitionLengths = splitResult.getPartitionLengths
rawPartitionLengths = splitResult.getRawPartitionLengths
try {
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
dataTmp)
} finally {
if (dataTmp.exists() && !dataTmp.delete()) {
logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}")
}
}
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}